This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6b9074e769 [Hotfix] Fix redis sink NPE (#8171)
6b9074e769 is described below

commit 6b9074e7699d48be970f788fc0eb3d8beb42a86f
Author: limin <[email protected]>
AuthorDate: Mon Dec 2 19:24:11 2024 +0800

    [Hotfix] Fix redis sink NPE (#8171)
---
 .../seatunnel/redis/sink/RedisSinkWriter.java      | 15 ++--
 .../connector/redis/RedisTestCaseTemplateIT.java   | 76 +++++++++++++++++++
 .../fake-to-redis-test-custom-key-is-null.conf     | 87 +++++++++++++++++++++
 ...is-test-custom-value-when-hash-key-is-null.conf | 87 +++++++++++++++++++++
 ...-test-custom-value-when-hash-value-is-null.conf | 88 ++++++++++++++++++++++
 ...-test-custom-value-when-other-type-is-null.conf | 87 +++++++++++++++++++++
 .../fake-to-redis-test-normal-key-is-null.conf     | 86 +++++++++++++++++++++
 7 files changed, 521 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index b42fc6107b..71739b5789 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -94,7 +94,8 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
 
     private static String getNormalKey(SeaTunnelRow element, List<String> 
fields, String keyField) {
         if (fields.contains(keyField)) {
-            return element.getField(fields.indexOf(keyField)).toString();
+            Object fieldValue = element.getField(fields.indexOf(keyField));
+            return fieldValue == null ? "" : fieldValue.toString();
         } else {
             return keyField;
         }
@@ -109,7 +110,8 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
                     && keyFieldSegment.endsWith(RIGHT_PLACEHOLDER_MARKER)) {
                 String realKeyField = keyFieldSegment.substring(1, 
keyFieldSegment.length() - 1);
                 if (fields.contains(realKeyField)) {
-                    
key.append(element.getField(fields.indexOf(realKeyField)).toString());
+                    Object realFieldValue = 
element.getField(fields.indexOf(realKeyField));
+                    key.append(realFieldValue == null ? "" : 
realFieldValue.toString());
                 } else {
                     key.append(keyFieldSegment);
                 }
@@ -146,7 +148,8 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
         }
         String hashKey;
         if (fields.contains(hashKeyField)) {
-            hashKey = 
element.getField(fields.indexOf(hashKeyField)).toString();
+            Object hashKeyFieldValue = 
element.getField(fields.indexOf(hashKeyField));
+            hashKey = hashKeyFieldValue == null ? "" : 
hashKeyFieldValue.toString();
         } else {
             hashKey = hashKeyField;
         }
@@ -155,7 +158,8 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
             hashValue = new String(serializationSchema.serialize(element));
         } else {
             if (fields.contains(hashValueField)) {
-                hashValue = 
element.getField(fields.indexOf(hashValueField)).toString();
+                Object hashValueFieldValue = 
element.getField(fields.indexOf(hashValueField));
+                hashValue = hashValueFieldValue == null ? "" : 
hashValueFieldValue.toString();
             } else {
                 hashValue = hashValueField;
             }
@@ -171,7 +175,8 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
             return null;
         }
         if (fields.contains(valueField)) {
-            return element.getField(fields.indexOf(valueField)).toString();
+            Object fieldValue = element.getField(fields.indexOf(valueField));
+            return fieldValue == null ? "" : fieldValue.toString();
         }
         return valueField;
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index efd9d8df44..96ac20cbe6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -501,5 +501,81 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
         jedis.del("zset_check");
     }
 
+    @TestTemplate
+    public void testFakeToRedisNormalKeyIsNullTest(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/fake-to-redis-test-normal-key-is-null.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        int count = 0;
+        String data = jedis.get("");
+        if (data != null) {
+            count++;
+            jedis.del("");
+        }
+        for (int i = 2; i <= 3; i++) {
+            data = jedis.get("NEW" + i);
+            if (data != null) {
+                count++;
+                jedis.del("NEW" + i);
+            }
+        }
+        Assertions.assertEquals(2, count);
+    }
+
+    @TestTemplate
+    public void testFakeToRedisCustomKeyIsNullTest(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/fake-to-redis-test-custom-key-is-null.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        int count = 0;
+        String data = jedis.get("key_check:");
+        if (data != null) {
+            count++;
+            jedis.del("key_check:");
+        }
+        for (int i = 2; i <= 3; i++) {
+            data = jedis.get("key_check:NEW" + i);
+            if (data != null) {
+                count++;
+                jedis.del("key_check:NEW" + i);
+            }
+        }
+        Assertions.assertEquals(2, count);
+    }
+
+    @TestTemplate
+    public void testFakeToRedisOtherTypeValueIsNullTest(TestContainer 
container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/fake-to-redis-test-custom-value-when-other-type-is-null.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(2, jedis.llen("list_check"));
+        jedis.del("list_check");
+    }
+
+    @TestTemplate
+    public void testFakeToRedisHashTypeKeyIsNullTest(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/fake-to-redis-test-custom-value-when-hash-key-is-null.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(2, jedis.hlen("hash_check"));
+        jedis.del("hash_check");
+    }
+
+    @TestTemplate
+    public void testFakeToRedisHashTypeValueIsNullTest(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/fake-to-redis-test-custom-value-when-hash-value-is-null.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(2, jedis.hlen("hash_check"));
+        jedis.del("hash_check");
+    }
+
     public abstract RedisContainerInfo getRedisContainerInfo();
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
new file mode 100644
index 0000000000..20ec18450a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = DELETE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "key_check:{val_string}"
+    data_type = key
+    support_custom_key = true
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
new file mode 100644
index 0000000000..774b4aa379
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = DELETE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "hash_check"
+    data_type = hash
+    hash_key_field = "val_string"
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
new file mode 100644
index 0000000000..8d3c2fee7a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = DELETE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "hash_check"
+    data_type = hash
+    hash_key_field = "id"
+    hash_value_field = "val_string"
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
new file mode 100644
index 0000000000..1eab8030d9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = DELETE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "list_check"
+    data_type = list
+    value_field = "val_string"
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-normal-key-is-null.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-normal-key-is-null.conf
new file mode 100644
index 0000000000..b9123c294a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-normal-key-is-null.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = DELETE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "val_string"
+    data_type = key
+    batch_size = 33
+  }
+}
\ No newline at end of file

Reply via email to