[ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=123051&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-123051
 ]

ASF GitHub Bot logged work on BEAM-3446:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Jul/18 21:21
            Start Date: 13/Jul/18 21:21
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on a change in pull request #5841: 
Fixes https://issues.apache.org/jira/browse/BEAM-3446.
URL: https://github.com/apache/beam/pull/5841#discussion_r202475941
 
 

 ##########
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##########
 @@ -279,28 +290,31 @@ public void processElement(ProcessContext 
processContext) throws Exception {
       while (!finished) {
         ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
         List<String> keys = scanResult.getResult();
-
-        Pipeline pipeline = jedis.pipelined();
-        if (keys != null) {
-          for (String key : keys) {
-            pipeline.get(key);
-          }
-          List<Object> values = pipeline.syncAndReturnAll();
-          for (int i = 0; i < values.size(); i++) {
-            processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-          }
+        for (String k : keys) {
+          processContext.output(k);
         }
-
         cursor = scanResult.getStringCursor();
         if ("0".equals(cursor)) {
           finished = true;
         }
       }
     }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn<KV<String, String>> {
 
-    @Teardown
-    public void teardown() {
-      jedis.close();
+    ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+      super(connectionConfiguration);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext processContext) throws Exception 
{
+      String key = processContext.element();
+
+      String value = jedis.get(key);
 
 Review comment:
   Hi, sorry I have missed your message. The idea is that we should add the 
DoFn startBundle and finishBundle methods and create a method in the Read to 
define the size of the maximum amount of elements that we will request, then 
you will build the collection of the keys that are going to be requested in the 
processElement, but you  won't do the request in the processElement but in the 
finishBundle method by doing a MGET request with the defined number of elements 
of the batch, we should choose a default min size e.g. 1000. It is similar to 
what other IOs do in the Write (see withBatchSize in ElasticsearchIO or SolrIO, 
for ref.
   
https://github.com/apache/beam/blob/c14c975224af417dcdc74fed8b0d893be742e9d7/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L805-L829

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 123051)
    Time Spent: 3h 10m  (was: 3h)

> RedisIO non-prefix read operations
> ----------------------------------
>
>                 Key: BEAM-3446
>                 URL: https://issues.apache.org/jira/browse/BEAM-3446
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-redis
>            Reporter: Vinay varma
>            Assignee: Vinay varma
>            Priority: Major
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
>     processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to