TeoZosa commented on issue #26165:
URL: https://github.com/apache/beam/issues/26165#issuecomment-1655118265
+1 for this feature as well. In our case we use marshaled Protobuf bytes and
stringifying the byte array leads to data loss and subsequent corruption.
For an immediate fix, we did the simplest thing and vendored the Redis IO,
swapping `<String, String>` to `<byte[], byte[]>`. @sigalite's solution sounds
more robust?
<details> <summary> ex. changes to the `Write` class (click to view)
</summary>
```diff
===================================================================
diff --git a/vendor/org/apache/beam/sdk/io/redis/RedisIO.java
b/vendor/org/apache/beam/sdk/io/redis/RedisIO.java
--- a/vendor/org/apache/beam/sdk/io/redis/RedisIO.java (revision aaa)
+++ b/vendor/org/apache/beam/sdk/io/redis/RedisIO.java (revision bbb)
@@ -20,6 +20,7 @@
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.coders.KvCoder;
@@ -437,9 +438,9 @@
}
}
- /** A {@link PTransform} to write to a Redis server. */
+ /** A {@link PTransform} to write to a Redis server. */
@AutoValue
- public abstract static class Write extends
PTransform<PCollection<KV<String, String>>, PDone> {
+ public abstract static class Write extends
PTransform<PCollection<KV<byte[], byte[]>>, PDone> {
/** Determines the method used to insert data in Redis. */
public enum Method {
@@ -540,14 +541,14 @@
}
@Override
- public PDone expand(PCollection<KV<String, String>> input) {
+ public PDone expand(PCollection<KV<byte[], byte[]>> input) {
checkArgument(connectionConfiguration() != null,
"withConnectionConfiguration() is required");
input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}
- private static class WriteFn extends DoFn<KV<String, String>, Void> {
+ private static class WriteFn extends DoFn<KV<byte[], byte[]>, Void> {
private static final int DEFAULT_BATCH_SIZE = 1000;
@@ -575,7 +576,7 @@
@ProcessElement
public void processElement(ProcessContext c) {
- KV<String, String> record = c.element();
+ KV<byte[], byte[]> record = c.element();
writeRecord(record);
@@ -588,7 +589,7 @@
}
}
- private void writeRecord(KV<String, String> record) {
+ private void writeRecord(KV<byte[], byte[]> record) {
Method method = spec.method();
Long expireTime = spec.expireTime();
@@ -609,18 +610,18 @@
}
}
- private void writeUsingAppendCommand(KV<String, String> record, Long
expireTime) {
- String key = record.getKey();
- String value = record.getValue();
+ private void writeUsingAppendCommand(KV<byte[], byte[]> record, Long
expireTime) {
+ byte[] key = record.getKey();
+ byte[] value = record.getValue();
transaction.append(key, value);
setExpireTimeWhenRequired(key, expireTime);
}
- private void writeUsingSetCommand(KV<String, String> record, Long
expireTime) {
- String key = record.getKey();
- String value = record.getValue();
+ private void writeUsingSetCommand(KV<byte[], byte[]> record, Long
expireTime) {
+ byte[] key = record.getKey();
+ byte[] value = record.getValue();
if (expireTime != null) {
transaction.psetex(key, expireTime, value);
@@ -630,10 +631,10 @@
}
private void writeUsingListCommand(
- KV<String, String> record, Method method, Long expireTime) {
+ KV<byte[], byte[]> record, Method method, Long expireTime) {
- String key = record.getKey();
- String value = record.getValue();
+ byte[] key = record.getKey();
+ byte[] value = record.getValue();
if (Method.LPUSH == method) {
transaction.lpush(key, value);
@@ -644,43 +645,43 @@
setExpireTimeWhenRequired(key, expireTime);
}
- private void writeUsingSaddCommand(KV<String, String> record, Long
expireTime) {
- String key = record.getKey();
- String value = record.getValue();
+ private void writeUsingSaddCommand(KV<byte[], byte[]> record, Long
expireTime) {
+ byte[] key = record.getKey();
+ byte[] value = record.getValue();
transaction.sadd(key, value);
setExpireTimeWhenRequired(key, expireTime);
}
- private void writeUsingHLLCommand(KV<String, String> record, Long
expireTime) {
- String key = record.getKey();
- String value = record.getValue();
+ private void writeUsingHLLCommand(KV<byte[], byte[]> record, Long
expireTime) {
+ byte[] key = record.getKey();
+ byte[] value = record.getValue();
transaction.pfadd(key, value);
setExpireTimeWhenRequired(key, expireTime);
}
- private void writeUsingIncrBy(KV<String, String> record, Long
expireTime) {
- String key = record.getKey();
- String value = record.getValue();
- long inc = Long.parseLong(value);
+ private void writeUsingIncrBy(KV<byte[], byte[]> record, Long
expireTime) {
+ byte[] key = record.getKey();
+ byte[] value = record.getValue();
+ long inc = Long.parseLong(new String(value,
StandardCharsets.ISO_8859_1));
transaction.incrBy(key, inc);
setExpireTimeWhenRequired(key, expireTime);
}
- private void writeUsingDecrBy(KV<String, String> record, Long
expireTime) {
- String key = record.getKey();
- String value = record.getValue();
- long decr = Long.parseLong(value);
+ private void writeUsingDecrBy(KV<byte[], byte[]> record, Long
expireTime) {
+ byte[] key = record.getKey();
+ byte[] value = record.getValue();
+ long decr = Long.parseLong(new String(value,
StandardCharsets.ISO_8859_1));
transaction.decrBy(key, decr);
setExpireTimeWhenRequired(key, expireTime);
}
- private void setExpireTimeWhenRequired(String key, Long expireTime) {
+ private void setExpireTimeWhenRequired(byte[] key, Long expireTime) {
if (expireTime != null) {
transaction.pexpire(key, expireTime);
}
@@ -706,12 +707,12 @@
}
/**
- * A {@link PTransform} to write stream key pairs
(https://redis.io/topics/streams-intro) to a
+ * A {@link PTransform} to write stream key pairs
(https://redis.io/topics/streams-intro) to a
* Redis server.
*/
@AutoValue
public abstract static class WriteStreams
- extends PTransform<PCollection<KV<String, Map<String, String>>>,
PDone> {
+ extends PTransform<PCollection<KV<byte[], Map<byte[], byte[]>>>,
PDone> {
abstract RedisConnectionConfiguration connectionConfiguration();
@@ -784,14 +785,14 @@
}
@Override
- public PDone expand(PCollection<KV<String, Map<String, String>>> input)
{
+ public PDone expand(PCollection<KV<byte[], Map<byte[], byte[]>>> input)
{
checkArgument(connectionConfiguration() != null,
"withConnectionConfiguration() is required");
input.apply(ParDo.of(new WriteStreamFn(this)));
return PDone.in(input.getPipeline());
}
- private static class WriteStreamFn extends DoFn<KV<String, Map<String,
String>>, Void> {
+ private static class WriteStreamFn extends DoFn<KV<byte[], Map<byte[],
byte[]>>, Void> {
private static final int DEFAULT_BATCH_SIZE = 1000;
@@ -819,7 +820,7 @@
@ProcessElement
public void processElement(ProcessContext c) {
- KV<String, Map<String, String>> record = c.element();
+ KV<byte[], Map<byte[], byte[]>> record = c.element();
writeRecord(record);
@@ -832,9 +833,9 @@
}
}
- private void writeRecord(KV<String, Map<String, String>> record) {
- String key = record.getKey();
- Map<String, String> value = record.getValue();
+ private void writeRecord(KV<byte[], Map<byte[], byte[]>> record) {
+ byte[] key = record.getKey();
+ Map<byte[], byte[]> value = record.getValue();
final XAddParams params = new
XAddParams().id(StreamEntryID.NEW_ENTRY);
if (spec.maxLen() > 0L) {
params.maxLen(spec.maxLen());
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]