[
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117792&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117792
]
ASF GitHub Bot logged work on BEAM-3446:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Jun/18 07:44
Start Date: 30/Jun/18 07:44
Worklog Time Spent: 10m
Work Description: vvarma closed pull request #4656: [BEAM-3446] Fixes
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/4656
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 2559d0773cc..f58200a0f36 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -200,6 +200,7 @@ public void populateDisplayData(DisplayData.Builder
builder) {
return input
.apply(Create.of(keyPattern()))
+ .apply(ParDo.of(new ReadKeywsWithPattern(connectionConfiguration())))
.apply(RedisIO.readAll().withConnectionConfiguration(connectionConfiguration()));
}
@@ -260,16 +261,12 @@ public ReadAll
withConnectionConfiguration(RedisConnectionConfiguration connecti
}
- /**
- * A {@link DoFn} requesting Redis server to get key/value pairs.
- */
- private static class ReadFn extends DoFn<String, KV<String, String>> {
-
- private final RedisConnectionConfiguration connectionConfiguration;
+ private abstract static class BaseReadFn<T> extends DoFn<String, T> {
+ protected final RedisConnectionConfiguration connectionConfiguration;
- private transient Jedis jedis;
+ protected transient Jedis jedis;
- public ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+ public BaseReadFn(RedisConnectionConfiguration connectionConfiguration) {
this.connectionConfiguration = connectionConfiguration;
}
@@ -278,6 +275,18 @@ public void setup() {
jedis = connectionConfiguration.connect();
}
+ @Teardown
+ public void teardown() {
+ jedis.close();
+ }
+ }
+
+ private static class ReadKeywsWithPattern extends BaseReadFn<String> {
+
+ ReadKeywsWithPattern(RedisConnectionConfiguration connectionConfiguration)
{
+ super(connectionConfiguration);
+ }
+
@ProcessElement
public void processElement(ProcessContext processContext) throws Exception
{
ScanParams scanParams = new ScanParams();
@@ -288,28 +297,33 @@ public void processElement(ProcessContext processContext)
throws Exception {
while (!finished) {
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
List<String> keys = scanResult.getResult();
-
- Pipeline pipeline = jedis.pipelined();
- if (keys != null) {
- for (String key : keys) {
- pipeline.get(key);
- }
- List<Object> values = pipeline.syncAndReturnAll();
- for (int i = 0; i < values.size(); i++) {
- processContext.output(KV.of(keys.get(i), (String) values.get(i)));
- }
+ for (String k : keys) {
+ processContext.output(k);
}
-
cursor = scanResult.getStringCursor();
if (cursor.equals("0")) {
finished = true;
}
}
}
+ }
+ /**
+ * A {@link DoFn} requesting Redis server to get key/value pairs.
+ */
+ private static class ReadFn extends BaseReadFn<KV<String, String>> {
- @Teardown
- public void teardown() {
- jedis.close();
+ ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+ super(connectionConfiguration);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext processContext) throws Exception
{
+ String key = processContext.element();
+
+ String value = jedis.get(key);
+ if (value != null) {
+ processContext.output(KV.of(key, value));
+ }
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 117792)
Time Spent: 2h 10m (was: 2h)
> RedisIO non-prefix read operations
> ----------------------------------
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
> Issue Type: New Feature
> Components: io-java-redis
> Reporter: Vinay varma
> Assignee: Vinay varma
> Priority: Major
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be
> used for exact key matches as well, the number of operations limits the
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)