[
https://issues.apache.org/jira/browse/BEAM-11997?focusedWorklogId=667875&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-667875
]
ASF GitHub Bot logged work on BEAM-11997:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Oct/21 16:54
Start Date: 20/Oct/21 16:54
Worklog Time Spent: 10m
Work Description: lukecwik commented on a change in pull request #15549:
URL: https://github.com/apache/beam/pull/15549#discussion_r732945649
##########
File path:
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -334,94 +336,85 @@ public void setup() {
public void teardown() {
jedis.close();
}
- }
-
- private static class ReadKeysWithPattern extends BaseReadFn<String> {
- ReadKeysWithPattern(RedisConnectionConfiguration connectionConfiguration) {
- super(connectionConfiguration);
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(@Element String pattern) {
+ return new OffsetRange(0, getKeyPatters(pattern).size());
}
@ProcessElement
- public void processElement(ProcessContext c) {
- ScanParams scanParams = new ScanParams();
- scanParams.match(c.element());
-
- String cursor = ScanParams.SCAN_POINTER_START;
- boolean finished = false;
- while (!finished) {
- ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
- List<String> keys = scanResult.getResult();
- for (String k : keys) {
- c.output(k);
+ public void processElement(ProcessContext c,
RestrictionTracker<OffsetRange, Long> tracker) {
Review comment:
I was under the impression that you wanted to add support for
checkpointing only and didn't want to support generic splitting since that
would require you to generate cursors using the information from
https://stackoverflow.com/a/59569053
In the checkpointing only case you would do something like:
```
processElement(...) {
cursor = restriction.getFrom()
while (true) {
values, nextCursor = redis.scan(cursor)
if (!tryClaim(nextCursor))) { // claims [cursor, nextCursor)
return STOP
}
output values
if (nextCursor == END) {
return STOP
}
cursor = nextCursor
}
return STOP
}
```
You would be claiming ranges `[cursor, nextCursor)` (or `[cursor, END]` if
`nextCursor == END`). `tryClaim` would fail if `cursor >= end`. This requires
you to still implement a subset of https://stackoverflow.com/a/59569053 so that
you can compare `cursor` against the restriction end.
Note that since you can't control what the next cursor being returned from
SCAN is you will output duplicates and would need a deduplication transform to
follow it.
When `tryClaim(nextCursor)` succeeds it would updates its restriction
`[from, end]` to `[nextCursor, end]`. A restriction would be considered done
when `from >= end` (except for the special case where the range is `[0, 0]`
representing start and end. Runner initiated checkpointing (e.g. splitting with
fractionOfRemainer==0) in the restriction tracker would be allowed if `from !=
end` and make the current restriction `[from, from]` (effectively an empty
range) and return a residual range `[from, end]`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 667875)
Time Spent: 2h 40m (was: 2.5h)
> Implement RedisIO on top of Splittable DoFn
> -------------------------------------------
>
> Key: BEAM-11997
> URL: https://issues.apache.org/jira/browse/BEAM-11997
> Project: Beam
> Issue Type: Improvement
> Components: io-java-redis
> Reporter: Boyuan Zhang
> Assignee: Miguel Anzo
> Priority: P2
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)