TeoZosa commented on issue #26165:
URL: https://github.com/apache/beam/issues/26165#issuecomment-1655118265

   +1 for this feature as well. In our case we use marshaled Protobuf bytes and 
stringifying the byte array leads to data loss and subsequent corruption. 
   
   For an immediate fix, we did the simplest thing and vendored the Redis IO, 
swapping `<String, String>` to `<byte[], byte[]>`. @sigalite's solution sounds 
more robust?
   
   <details> <summary> ex. changes to the `Write` class (click to view) 
</summary>
   
   ```diff
   ===================================================================
   diff --git a/vendor/org/apache/beam/sdk/io/redis/RedisIO.java 
b/vendor/org/apache/beam/sdk/io/redis/RedisIO.java
   --- a/vendor/org/apache/beam/sdk/io/redis/RedisIO.java       (revision aaa)
   +++ b/vendor/org/apache/beam/sdk/io/redis/RedisIO.java       (revision bbb)
   @@ -20,6 +20,7 @@
    import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
    
    import com.google.auto.value.AutoValue;
   +import java.nio.charset.StandardCharsets;
    import java.util.List;
    import java.util.Map;
    import org.apache.beam.sdk.coders.KvCoder;
   @@ -437,9 +438,9 @@
        }
      }
    
   -  /** A {@link PTransform} to write to a Redis server. */
   +  /** A {@link PTransform} to write to a Redis server. */
      @AutoValue
   -  public abstract static class Write extends 
PTransform<PCollection<KV<String, String>>, PDone> {
   +  public abstract static class Write extends 
PTransform<PCollection<KV<byte[], byte[]>>, PDone> {
    
        /** Determines the method used to insert data in Redis. */
        public enum Method {
   @@ -540,14 +541,14 @@
        }
    
        @Override
   -    public PDone expand(PCollection<KV<String, String>> input) {
   +    public PDone expand(PCollection<KV<byte[], byte[]>> input) {
          checkArgument(connectionConfiguration() != null, 
"withConnectionConfiguration() is required");
    
          input.apply(ParDo.of(new WriteFn(this)));
          return PDone.in(input.getPipeline());
        }
    
   -    private static class WriteFn extends DoFn<KV<String, String>, Void> {
   +    private static class WriteFn extends DoFn<KV<byte[], byte[]>, Void> {
    
          private static final int DEFAULT_BATCH_SIZE = 1000;
    
   @@ -575,7 +576,7 @@
    
          @ProcessElement
          public void processElement(ProcessContext c) {
   -        KV<String, String> record = c.element();
   +        KV<byte[], byte[]> record = c.element();
    
            writeRecord(record);
    
   @@ -588,7 +589,7 @@
            }
          }
    
   -      private void writeRecord(KV<String, String> record) {
   +      private void writeRecord(KV<byte[], byte[]> record) {
            Method method = spec.method();
            Long expireTime = spec.expireTime();
    
   @@ -609,18 +610,18 @@
            }
          }
    
   -      private void writeUsingAppendCommand(KV<String, String> record, Long 
expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   +      private void writeUsingAppendCommand(KV<byte[], byte[]> record, Long 
expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
    
            transaction.append(key, value);
    
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void writeUsingSetCommand(KV<String, String> record, Long 
expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   +      private void writeUsingSetCommand(KV<byte[], byte[]> record, Long 
expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
    
            if (expireTime != null) {
              transaction.psetex(key, expireTime, value);
   @@ -630,10 +631,10 @@
          }
    
          private void writeUsingListCommand(
   -          KV<String, String> record, Method method, Long expireTime) {
   +          KV<byte[], byte[]> record, Method method, Long expireTime) {
    
   -        String key = record.getKey();
   -        String value = record.getValue();
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
    
            if (Method.LPUSH == method) {
              transaction.lpush(key, value);
   @@ -644,43 +645,43 @@
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void writeUsingSaddCommand(KV<String, String> record, Long 
expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   +      private void writeUsingSaddCommand(KV<byte[], byte[]> record, Long 
expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
    
            transaction.sadd(key, value);
    
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void writeUsingHLLCommand(KV<String, String> record, Long 
expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   +      private void writeUsingHLLCommand(KV<byte[], byte[]> record, Long 
expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
    
            transaction.pfadd(key, value);
    
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void writeUsingIncrBy(KV<String, String> record, Long 
expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   -        long inc = Long.parseLong(value);
   +      private void writeUsingIncrBy(KV<byte[], byte[]> record, Long 
expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
   +        long inc = Long.parseLong(new String(value, 
StandardCharsets.ISO_8859_1));
            transaction.incrBy(key, inc);
    
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void writeUsingDecrBy(KV<String, String> record, Long 
expireTime) {
   -        String key = record.getKey();
   -        String value = record.getValue();
   -        long decr = Long.parseLong(value);
   +      private void writeUsingDecrBy(KV<byte[], byte[]> record, Long 
expireTime) {
   +        byte[] key = record.getKey();
   +        byte[] value = record.getValue();
   +        long decr = Long.parseLong(new String(value, 
StandardCharsets.ISO_8859_1));
            transaction.decrBy(key, decr);
    
            setExpireTimeWhenRequired(key, expireTime);
          }
    
   -      private void setExpireTimeWhenRequired(String key, Long expireTime) {
   +      private void setExpireTimeWhenRequired(byte[] key, Long expireTime) {
            if (expireTime != null) {
              transaction.pexpire(key, expireTime);
            }
   @@ -706,12 +707,12 @@
      }
    
      /**
   -   * A {@link PTransform} to write stream key pairs 
(https://redis.io/topics/streams-intro) to a
   +   * A {@link PTransform} to write stream key pairs 
(https://redis.io/topics/streams-intro) to a
       * Redis server.
       */
      @AutoValue
      public abstract static class WriteStreams
   -      extends PTransform<PCollection<KV<String, Map<String, String>>>, 
PDone> {
   +      extends PTransform<PCollection<KV<byte[], Map<byte[], byte[]>>>, 
PDone> {
    
        abstract RedisConnectionConfiguration connectionConfiguration();
    
   @@ -784,14 +785,14 @@
        }
    
        @Override
   -    public PDone expand(PCollection<KV<String, Map<String, String>>> input) 
{
   +    public PDone expand(PCollection<KV<byte[], Map<byte[], byte[]>>> input) 
{
          checkArgument(connectionConfiguration() != null, 
"withConnectionConfiguration() is required");
    
          input.apply(ParDo.of(new WriteStreamFn(this)));
          return PDone.in(input.getPipeline());
        }
    
   -    private static class WriteStreamFn extends DoFn<KV<String, Map<String, 
String>>, Void> {
   +    private static class WriteStreamFn extends DoFn<KV<byte[], Map<byte[], 
byte[]>>, Void> {
    
          private static final int DEFAULT_BATCH_SIZE = 1000;
    
   @@ -819,7 +820,7 @@
    
          @ProcessElement
          public void processElement(ProcessContext c) {
   -        KV<String, Map<String, String>> record = c.element();
   +        KV<byte[], Map<byte[], byte[]>> record = c.element();
    
            writeRecord(record);
    
   @@ -832,9 +833,9 @@
            }
          }
    
   -      private void writeRecord(KV<String, Map<String, String>> record) {
   -        String key = record.getKey();
   -        Map<String, String> value = record.getValue();
   +      private void writeRecord(KV<byte[], Map<byte[], byte[]>> record) {
   +        byte[] key = record.getKey();
   +        Map<byte[], byte[]> value = record.getValue();
            final XAddParams params = new 
XAddParams().id(StreamEntryID.NEW_ENTRY);
            if (spec.maxLen() > 0L) {
              params.maxLen(spec.maxLen());
   
   ```
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to