[
https://issues.apache.org/jira/browse/BEAM-6099?focusedWorklogId=168359&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-168359
]
ASF GitHub Bot logged work on BEAM-6099:
----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Nov/18 16:35
Start Date: 21/Nov/18 16:35
Worklog Time Spent: 10m
Work Description: aromanenko-dev closed pull request #7093: [BEAM-6099]
RedisIO support for PFADD operation
URL: https://github.com/apache/beam/pull/7093
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 8a58e2860f8..afe378108fc 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -444,7 +444,10 @@ public void processElement(ProcessContext context) {
* exist, it is created as empty list before performing the push
operations. When key holds a
* value that is not a list, an error is returned.
*/
- RPUSH
+ RPUSH,
+
+ /** Use PFADD command. Insert value in a HLL structure. Create key if it
doesn't exist */
+ PFADD
}
@Nullable
@@ -567,6 +570,8 @@ private void writeRecord(KV<String, String> record) {
writeUsingSetCommand(record, expireTime);
} else if (Method.LPUSH == method || Method.RPUSH == method) {
writeUsingListCommand(record, method, expireTime);
+ } else if (Method.PFADD == method) {
+ writeUsingHLLCommand(record, method, expireTime);
}
}
@@ -605,6 +610,13 @@ private void writeUsingListCommand(
setExpireTimeWhenRequired(key, expireTime);
}
+ private void writeUsingHLLCommand(KV<String, String> record, Method
method, Long expireTime) {
+ String key = record.getKey();
+ String value = record.getValue();
+
+ pipeline.pfadd(key, value);
+ }
+
private void setExpireTimeWhenRequired(String key, Long expireTime) {
if (expireTime != null) {
pipeline.pexpire(key, expireTime);
diff --git
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index 3280ab4820d..e2c10f633e8 100644
---
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -186,6 +186,34 @@ public void testWriteReadUsingRpushMethod() throws
Exception {
Assert.assertEquals(value + newValue, String.join("", values));
}
+ @Test
+ public void testWriteUsingHLLMethod() throws Exception {
+ String key = "key";
+
+ Jedis jedis =
+ RedisConnectionConfiguration.create(REDIS_HOST,
embeddedRedis.getPort()).connect();
+
+ PCollection<KV<String, String>> write =
+ writePipeline.apply(
+ Create.of(
+ KV.of(key, "0"),
+ KV.of(key, "1"),
+ KV.of(key, "2"),
+ KV.of(key, "3"),
+ KV.of(key, "2"),
+ KV.of(key, "4"),
+ KV.of(key, "0"),
+ KV.of(key, "5")));
+
+ write.apply(
+ RedisIO.write().withEndpoint(REDIS_HOST,
embeddedRedis.getPort()).withMethod(Method.PFADD));
+
+ writePipeline.run();
+
+ long count = jedis.pfcount(key);
+ Assert.assertEquals(6, count);
+ }
+
@Test
public void testReadBuildsCorrectly() {
RedisIO.Read read = RedisIO.read().withEndpoint("test",
111).withAuth("pass").withTimeout(5);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 168359)
Time Spent: 3h 20m (was: 3h 10m)
> RedisIO support for PFADD operation
> -----------------------------------
>
> Key: BEAM-6099
> URL: https://issues.apache.org/jira/browse/BEAM-6099
> Project: Beam
> Issue Type: Improvement
> Components: io-java-redis
> Affects Versions: 2.7.0
> Reporter: Varun Dhussa
> Assignee: Varun Dhussa
> Priority: Minor
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> HLL has multiple use cases such as analytics dashboards. It allows fast
> counts and an easy way to do unions over multiple sets while saving memory.
> I have created a small improvement to add the PFADD method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)