mosche commented on a change in pull request #15858:
URL: https://github.com/apache/beam/pull/15858#discussion_r767724831
##########
File path:
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -105,6 +107,57 @@
* .apply(RedisIO.write().withEndpoint("::1", 6379))
*
* }</pre>
+ *
+ * <h3>Writing Redis Streams</h3>
+ *
+ * <p>{@link #writeStreams()} provides a sink to write key/value pairs
represented as {@link KV}
Review comment:
I find the Redis docs quite intuitive there:
> Appends the specified stream entry to the stream at the specified key.
How about changing this to the following?
```
{@link #writeStreams()} appends the entries of a {@link PCollection} of
key/value pairs represented as {@link KV}
to the Redis stream at the specified key using the <a
href='https://redis.io/commands/XADD'>XADD</a> API.
```
##########
File path:
sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -250,6 +260,102 @@ public void testWriteUsingDECRBY() {
assertEquals(-1, count);
}
+ @Test
+ public void testWriteStreams() {
+ ArrayList<String> streams = new ArrayList<String>();
+ for (int i = 0; i <= 10; i++) {
+ UUID uuid = UUID.randomUUID();
+ /* stream keys are uuids to ensure that test runs are idempotent */
+ streams.add(uuid.toString());
+ }
+ Map<String, String> fooValues = ImmutableMap.of("sensor-id", "1234",
"temperature", "19.8");
+ Map<String, String> barValues = ImmutableMap.of("sensor-id", "9999",
"temperature", "18.2");
+
+ List<KV<String, Map<String, String>>> fooData =
+ streams.stream().map(key -> KV.of(key,
fooValues)).collect(Collectors.toList());
+
+ List<KV<String, Map<String, String>>> barData =
+ streams.stream().map(key -> KV.of(key,
barValues)).collect(Collectors.toList());
+
+ List<KV<String, Map<String, String>>> allData =
+ Stream.of(fooData,
barData).flatMap(Collection::stream).collect(Collectors.toList());
+
+ PCollection<KV<String, Map<String, String>>> write =
+ p.apply(
+ Create.of(allData)
+ .withCoder(
+ KvCoder.of(
+ StringUtf8Coder.of(),
+ MapCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of()))));
+ write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+ p.run();
+
+ for (String stream : streams) {
Review comment:
Please use matchers to express this in a more intuitive way, e.g.
```
Set<String> streams = ImmutableSet.copyOf(transform(allData,
KV::getKey)); // if using above
for (String stream : streams) {
List<StreamEntry> results = client.xrange(stream, null, null,
Integer.MAX_VALUE);
assertEquals(2, results.size());
assertThat(transform(results, StreamEntry::getFields),
hasItems(fooValues, barValues));
}
```
Note, this uses a few static imports:
```
import static java.util.stream.Collectors.toList;
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.transform;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
```
##########
File path:
sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
##########
@@ -250,6 +260,102 @@ public void testWriteUsingDECRBY() {
assertEquals(-1, count);
}
+ @Test
+ public void testWriteStreams() {
+ ArrayList<String> streams = new ArrayList<String>();
+ for (int i = 0; i <= 10; i++) {
+ UUID uuid = UUID.randomUUID();
+ /* stream keys are uuids to ensure that test runs are idempotent */
+ streams.add(uuid.toString());
+ }
+ Map<String, String> fooValues = ImmutableMap.of("sensor-id", "1234",
"temperature", "19.8");
+ Map<String, String> barValues = ImmutableMap.of("sensor-id", "9999",
"temperature", "18.2");
+
+ List<KV<String, Map<String, String>>> fooData =
+ streams.stream().map(key -> KV.of(key,
fooValues)).collect(Collectors.toList());
+
+ List<KV<String, Map<String, String>>> barData =
+ streams.stream().map(key -> KV.of(key,
barValues)).collect(Collectors.toList());
+
+ List<KV<String, Map<String, String>>> allData =
+ Stream.of(fooData,
barData).flatMap(Collection::stream).collect(Collectors.toList());
Review comment:
Just pointing out a more idiomatic alternative here, again using
streams to generate the input data (though not blocking)
```java
Map<String, String> fooValues = ImmutableMap.of("sensor-id", "1234",
"temperature", "19.8");
Map<String, String> barValues = ImmutableMap.of("sensor-id", "9999",
"temperature", "18.2");
List<KV<String, Map<String, String>>> allData = IntStream.range(0, 10)
.boxed()
.flatMap(id -> Stream.of(
KV.of("testWriteStreams"+id, fooValues),
KV.of("testWriteStreams"+id, barValues)))
.collect(toList());
```
##########
File path:
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -142,6 +144,15 @@ public static Write write() {
.build();
}
+ /** Write data to a Redis server. */
+ public static WriteStreams writeStreams() {
Review comment:
Thanks for adding the docs 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]