lukecwik commented on a change in pull request #15549: URL: https://github.com/apache/beam/pull/15549#discussion_r737630613
########## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursor.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.redis; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import java.io.Serializable; +import javax.annotation.Nonnull; + +public class RedisCursor implements Comparable<RedisCursor>, Serializable { + + public static final String START_CURSOR = "0"; + public static final String END_CURSOR = "0"; + + private final String cursor; + private final long nKeys; + private final long index; + + public static RedisCursor of(String cursor, long nKeys) throws IllegalStateException { + if (nKeys == 0) { + throw new IllegalStateException("zero keys"); + } + return new RedisCursor(cursor, nKeys); + } + + private RedisCursor(String cursor, long nKeys) { + this.cursor = cursor; + this.nKeys = nKeys; + this.index = toIndex(); + } + + /** + * {@link RedisCursor} implements {@link Comparable Comparable<RedisCursor>} by transformig + * the cursors to an index of the Redis table. + */ + @Override + public int compareTo(@Nonnull RedisCursor other) { + checkNotNull(other, "other"); + return Long.compare(index, other.index); + } + + private int toIndex() { + int cursorInt = Integer.parseInt(cursor); + StringBuilder cursorBuilder = new StringBuilder(Integer.toBinaryString(cursorInt)).reverse(); + double pow = getTablePow(); + while (cursorBuilder.length() < pow) { + cursorBuilder.append("0"); + } + return Integer.parseInt(cursorBuilder.toString(), 2); + } + + private double getTablePow() { Review comment: ```suggestion private int getTablePow() { ``` ########## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursor.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.redis; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import java.io.Serializable; +import javax.annotation.Nonnull; + +public class RedisCursor implements Comparable<RedisCursor>, Serializable { + + public static final String START_CURSOR = "0"; + public static final String END_CURSOR = "0"; + + private final String cursor; + private final long nKeys; + private final long index; + + public static RedisCursor of(String cursor, long nKeys) throws IllegalStateException { + if (nKeys == 0) { + throw new IllegalStateException("zero keys"); + } + return new RedisCursor(cursor, nKeys); + } + + private RedisCursor(String cursor, long nKeys) { + this.cursor = cursor; + this.nKeys = nKeys; + this.index = toIndex(); + } + + /** + * {@link RedisCursor} implements {@link Comparable Comparable<RedisCursor>} by transformig + * the cursors to an index of the Redis table. + */ + @Override + public int compareTo(@Nonnull RedisCursor other) { + checkNotNull(other, "other"); + return Long.compare(index, other.index); + } + + private int toIndex() { Review comment: ```suggestion @VisibleForTesting static long toIndex(String cursor, int nKeys) { ``` Consider making static and package private so you can compare cursors. ########## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ########## @@ -334,94 +337,69 @@ public void setup() { public void teardown() { jedis.close(); } - } - private static class ReadKeysWithPattern extends BaseReadFn<String> { - - ReadKeysWithPattern(RedisConnectionConfiguration connectionConfiguration) { - super(connectionConfiguration); + @GetInitialRestriction + public RedisCursorRange getInitialRestriction() { + nKeys = jedis.dbSize(); Review comment: You need to get the table size and not the number of keys. ########## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ########## @@ -316,13 +314,18 @@ public ReadKeyPatterns withOutputParallelization(boolean outputParallelization) } } - private abstract static class BaseReadFn<T> extends DoFn<String, T> { - protected final RedisConnectionConfiguration connectionConfiguration; + @DoFn.BoundedPerElement + private static class ReadFn extends DoFn<String, KV<String, String>> { + protected final RedisConnectionConfiguration connectionConfiguration; transient Jedis jedis; + private long batchSize; Review comment: ```suggestion private final long batchSize; ``` ########## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursor.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.redis; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import java.io.Serializable; +import javax.annotation.Nonnull; + +public class RedisCursor implements Comparable<RedisCursor>, Serializable { + + public static final String START_CURSOR = "0"; + public static final String END_CURSOR = "0"; + + private final String cursor; + private final long nKeys; + private final long index; + + public static RedisCursor of(String cursor, long nKeys) throws IllegalStateException { Review comment: ```suggestion public static RedisCursor of(String cursor, long tableSize) throws IllegalStateException { ``` ########## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursor.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.redis; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import java.io.Serializable; +import javax.annotation.Nonnull; + +public class RedisCursor implements Comparable<RedisCursor>, Serializable { + + public static final String START_CURSOR = "0"; + public static final String END_CURSOR = "0"; + + private final String cursor; + private final long nKeys; + private final long index; + + public static RedisCursor of(String cursor, long nKeys) throws IllegalStateException { + if (nKeys == 0) { + throw new IllegalStateException("zero keys"); + } + return new RedisCursor(cursor, nKeys); + } + + private RedisCursor(String cursor, long nKeys) { + this.cursor = cursor; + this.nKeys = nKeys; + this.index = toIndex(); + } + + /** + * {@link RedisCursor} implements {@link Comparable Comparable<RedisCursor>} by transformig Review comment: ```suggestion * {@link RedisCursor} implements {@link Comparable Comparable<RedisCursor>} by transforming ``` ########## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursorTracker.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.redis; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class RedisCursorTracker extends RestrictionTracker<RedisCursorRange, RedisCursor> + implements RestrictionTracker.HasProgress { + + private RedisCursorRange range; + + private @Nullable RedisCursor lastAttemptedKey; + private @Nullable RedisCursor lastClaimedKey; + + private RedisCursorTracker(RedisCursorRange range) { + this.range = range; + } + + public static RedisCursorTracker of(RedisCursorRange range) { + return new RedisCursorTracker(range); + } + + @Override + public boolean tryClaim(RedisCursor position) { + if (!position.getCursor().equals(RedisCursor.END_CURSOR)) { Review comment: You should keep track of the _initial_ range and update `lastAttemptedKey` and `lastClaimedKey`. This will be needed for splitting and will simplify the error handling. ########## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursorTracker.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.redis; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class RedisCursorTracker extends RestrictionTracker<RedisCursorRange, RedisCursor> + implements RestrictionTracker.HasProgress { + + private RedisCursorRange range; + + private @Nullable RedisCursor lastAttemptedKey; + private @Nullable RedisCursor lastClaimedKey; + + private RedisCursorTracker(RedisCursorRange range) { + this.range = range; + } + + public static RedisCursorTracker of(RedisCursorRange range) { + return new RedisCursorTracker(range); + } + + @Override + public boolean tryClaim(RedisCursor position) { + if (!position.getCursor().equals(RedisCursor.END_CURSOR)) { + checkArgument( + position.compareTo(range.getEndPosition()) >= 0, + "Trying to claim cursor %s[%s] while range end was %s[%s]", + position.getCursor(), + position.getIndex(), + range.getEndPosition().getCursor(), + range.getEndPosition().getIndex()); + } + range = range.fromEndPosition(position); + return true; + } + + @Override + public RedisCursorRange currentRestriction() { + return range; + } + + @Override + public SplitResult<RedisCursorRange> trySplit(double fractionOfRemainder) { + /*if (fractionOfRemainder != 0.0 || range.getEndPosition().getCursor().equals(range.getStartPosition().getCursor())) { + return null; + } + return SplitResult.of( + RedisCursorRange.of(range.getStartPosition(), range.getStartPosition()), + range);*/ + return null; + } + + @Override + public void checkDone() throws IllegalStateException { + if (range.getStartPosition().compareTo(range.getEndPosition()) >= 0) { + return; + } + if (range.getStartPosition().getCursor().equals(RedisCursor.START_CURSOR) + && range.getStartPosition().getCursor().equals(RedisCursor.END_CURSOR)) { + return; + } + throw new IllegalStateException("not done iterating cursors"); + } + + @Override + public IsBounded isBounded() { + return IsBounded.BOUNDED; + } + + @Override + public Progress getProgress() { + return null; Review comment: You should be able to compute the current position as a fraction out of the entire range multiplied by DB size ########## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ########## @@ -334,94 +337,69 @@ public void setup() { public void teardown() { jedis.close(); } - } - private static class ReadKeysWithPattern extends BaseReadFn<String> { - - ReadKeysWithPattern(RedisConnectionConfiguration connectionConfiguration) { - super(connectionConfiguration); + @GetInitialRestriction + public RedisCursorRange getInitialRestriction() { + nKeys = jedis.dbSize(); + return RedisCursorRange.of(RedisCursor.of("0", nKeys), RedisCursor.of("0", nKeys)); } @ProcessElement - public void processElement(ProcessContext c) { + public ProcessContinuation processElement( + ProcessContext c, RestrictionTracker<RedisCursorRange, RedisCursor> tracker) { + String cursor = tracker.currentRestriction().getStartPosition().getCursor(); ScanParams scanParams = new ScanParams(); scanParams.match(c.element()); - - String cursor = ScanParams.SCAN_POINTER_START; - boolean finished = false; - while (!finished) { + while (true) { ScanResult<String> scanResult = jedis.scan(cursor, scanParams); - List<String> keys = scanResult.getResult(); - for (String k : keys) { - c.output(k); + if (!tracker.tryClaim(RedisCursor.of(scanResult.getCursor(), nKeys))) { Review comment: There is no guarantee that `nKeys` will be populated since `@ProcessElement` and `@GetInitialRestriction` can execute on different machines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
