fernando-wizeline commented on a change in pull request #15549:
URL: https://github.com/apache/beam/pull/15549#discussion_r826382014
##########
File path:
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -334,95 +337,33 @@ public void setup() {
public void teardown() {
jedis.close();
}
- }
-
- private static class ReadKeysWithPattern extends BaseReadFn<String> {
- ReadKeysWithPattern(RedisConnectionConfiguration connectionConfiguration) {
- super(connectionConfiguration);
+ @GetInitialRestriction
+ public ByteKeyRange getInitialRestriction() {
+ return ByteKeyRange.of(ByteKey.of(0x00), ByteKey.EMPTY);
}
@ProcessElement
- public void processElement(ProcessContext c) {
+ public void processElement(
+ ProcessContext c, RestrictionTracker<ByteKeyRange, ByteKey> tracker) {
+ ByteKey cursor = tracker.currentRestriction().getStartKey();
+ RedisCursor redisCursor = RedisCursor.byteKeyToRedisCursor(cursor,
jedis.dbSize(), true);
ScanParams scanParams = new ScanParams();
scanParams.match(c.element());
-
- String cursor = ScanParams.SCAN_POINTER_START;
- boolean finished = false;
- while (!finished) {
- ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
- List<String> keys = scanResult.getResult();
- for (String k : keys) {
- c.output(k);
- }
- cursor = scanResult.getCursor();
- if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
- finished = true;
- }
- }
- }
- }
-
- /** A {@link DoFn} requesting Redis server to get key/value pairs. */
- private static class ReadFn extends BaseReadFn<KV<String, String>> {
- transient @Nullable Multimap<BoundedWindow, String> bundles = null;
- @Nullable AtomicInteger batchCount = null;
- private final int batchSize;
-
- ReadFn(RedisConnectionConfiguration connectionConfiguration, int
batchSize) {
- super(connectionConfiguration);
- this.batchSize = batchSize;
- }
-
- @StartBundle
- public void startBundle() {
- bundles = ArrayListMultimap.create();
- batchCount = new AtomicInteger();
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) {
- String key = c.element();
- bundles.put(window, key);
- if (batchCount.incrementAndGet() > getBatchSize()) {
- Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
- for (BoundedWindow w : kvs.keySet()) {
- for (KV<String, String> kv : kvs.get(w)) {
- c.output(kv);
- }
- }
- }
- }
-
- @FinishBundle
- public void finishBundle(FinishBundleContext context) {
- Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
- for (BoundedWindow w : kvs.keySet()) {
- for (KV<String, String> kv : kvs.get(w)) {
- context.output(kv, w.maxTimestamp(), w);
- }
- }
- }
-
- private int getBatchSize() {
- return batchSize;
- }
-
- private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() {
- Multimap<BoundedWindow, KV<String, String>> kvs =
ArrayListMultimap.create();
- for (BoundedWindow w : bundles.keySet()) {
- String[] keys = new String[bundles.get(w).size()];
- bundles.get(w).toArray(keys);
- List<String> results = jedis.mget(keys);
- for (int i = 0; i < results.size(); i++) {
- if (results.get(i) != null) {
- kvs.put(w, KV.of(keys[i], results.get(i)));
+ while (tracker.tryClaim(cursor)) {
+ ScanResult<String> scanResult = jedis.scan(redisCursor.getCursor(),
scanParams);
+ if (scanResult.getResult().size() > 0) {
+ String[] keys = scanResult.getResult().toArray(new
String[scanResult.getResult().size()]);
+ List<String> results = jedis.mget(keys);
+ for (int i = 0; i < results.size(); i++) {
+ if (results.get(i) != null) {
+ c.output(KV.of(keys[i], results.get(i)));
+ }
}
}
+ redisCursor = RedisCursor.of(scanResult.getCursor(), jedis.dbSize(),
false);
Review comment:
Yeah, that is correct, we're handling the case properly.
This is being handled by the try/claim in line 353-361, since the ByteKey
has a special case for the 0 value at the end of the cursor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]