[ 
https://issues.apache.org/jira/browse/BEAM-11997?focusedWorklogId=667875&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-667875
 ]

ASF GitHub Bot logged work on BEAM-11997:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Oct/21 16:54
            Start Date: 20/Oct/21 16:54
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #15549:
URL: https://github.com/apache/beam/pull/15549#discussion_r732945649



##########
File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -334,94 +336,85 @@ public void setup() {
     public void teardown() {
       jedis.close();
     }
-  }
-
-  private static class ReadKeysWithPattern extends BaseReadFn<String> {
 
-    ReadKeysWithPattern(RedisConnectionConfiguration connectionConfiguration) {
-      super(connectionConfiguration);
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element String pattern) {
+      return new OffsetRange(0, getKeyPatters(pattern).size());
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) {
-      ScanParams scanParams = new ScanParams();
-      scanParams.match(c.element());
-
-      String cursor = ScanParams.SCAN_POINTER_START;
-      boolean finished = false;
-      while (!finished) {
-        ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
-        List<String> keys = scanResult.getResult();
-        for (String k : keys) {
-          c.output(k);
+    public void processElement(ProcessContext c, 
RestrictionTracker<OffsetRange, Long> tracker) {

Review comment:
       I was under the impression that you wanted to add support for 
checkpointing only and didn't want to support generic splitting since that 
would require you to generate cursors using the information from 
https://stackoverflow.com/a/59569053
   
   In the checkpointing only case you would do something like:
   ```
   processElement(...) {
   cursor = restriction.getFrom()
   while (true) {
     values, nextCursor = redis.scan(cursor)
     if (!tryClaim(nextCursor))) {  // claims [cursor, nextCursor)
       return STOP
     }
     output values
     if (nextCursor == END) {
       return STOP
     }
     cursor = nextCursor
   }
   return STOP
   }
   ```
   
   You would be claiming ranges `[cursor, nextCursor)` (or `[cursor, END]` if 
`nextCursor == END`). `tryClaim` would fail if `cursor >= end`. This requires 
you to still implement a subset of https://stackoverflow.com/a/59569053 so that 
you can compare `cursor` against the restriction end.
   
   Note that since you can't control what the next cursor being returned from 
SCAN is you will output duplicates and would need a deduplication transform to 
follow it.
   
   When `tryClaim(nextCursor)` succeeds it would updates its restriction 
`[from, end]` to `[nextCursor, end]`. A restriction would be considered done 
when `from >= end` (except for the special case where the range is `[0, 0]` 
representing start and end. Runner initiated checkpointing (e.g. splitting with 
fractionOfRemainer==0) in the restriction tracker would be allowed if `from != 
end` and make the current restriction `[from, from]` (effectively an empty 
range) and return a residual range `[from, end]`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 667875)
    Time Spent: 2h 40m  (was: 2.5h)

> Implement RedisIO on top of Splittable DoFn
> -------------------------------------------
>
>                 Key: BEAM-11997
>                 URL: https://issues.apache.org/jira/browse/BEAM-11997
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-redis
>            Reporter: Boyuan Zhang
>            Assignee: Miguel Anzo
>            Priority: P2
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to