This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6b9074e769 [Hotfix] Fix redis sink NPE (#8171)
6b9074e769 is described below
commit 6b9074e7699d48be970f788fc0eb3d8beb42a86f
Author: limin <[email protected]>
AuthorDate: Mon Dec 2 19:24:11 2024 +0800
[Hotfix] Fix redis sink NPE (#8171)
---
.../seatunnel/redis/sink/RedisSinkWriter.java | 15 ++--
.../connector/redis/RedisTestCaseTemplateIT.java | 76 +++++++++++++++++++
.../fake-to-redis-test-custom-key-is-null.conf | 87 +++++++++++++++++++++
...is-test-custom-value-when-hash-key-is-null.conf | 87 +++++++++++++++++++++
...-test-custom-value-when-hash-value-is-null.conf | 88 ++++++++++++++++++++++
...-test-custom-value-when-other-type-is-null.conf | 87 +++++++++++++++++++++
.../fake-to-redis-test-normal-key-is-null.conf | 86 +++++++++++++++++++++
7 files changed, 521 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index b42fc6107b..71739b5789 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -94,7 +94,8 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
private static String getNormalKey(SeaTunnelRow element, List<String>
fields, String keyField) {
if (fields.contains(keyField)) {
- return element.getField(fields.indexOf(keyField)).toString();
+ Object fieldValue = element.getField(fields.indexOf(keyField));
+ return fieldValue == null ? "" : fieldValue.toString();
} else {
return keyField;
}
@@ -109,7 +110,8 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
&& keyFieldSegment.endsWith(RIGHT_PLACEHOLDER_MARKER)) {
String realKeyField = keyFieldSegment.substring(1,
keyFieldSegment.length() - 1);
if (fields.contains(realKeyField)) {
-
key.append(element.getField(fields.indexOf(realKeyField)).toString());
+ Object realFieldValue =
element.getField(fields.indexOf(realKeyField));
+ key.append(realFieldValue == null ? "" :
realFieldValue.toString());
} else {
key.append(keyFieldSegment);
}
@@ -146,7 +148,8 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
}
String hashKey;
if (fields.contains(hashKeyField)) {
- hashKey =
element.getField(fields.indexOf(hashKeyField)).toString();
+ Object hashKeyFieldValue =
element.getField(fields.indexOf(hashKeyField));
+ hashKey = hashKeyFieldValue == null ? "" :
hashKeyFieldValue.toString();
} else {
hashKey = hashKeyField;
}
@@ -155,7 +158,8 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
hashValue = new String(serializationSchema.serialize(element));
} else {
if (fields.contains(hashValueField)) {
- hashValue =
element.getField(fields.indexOf(hashValueField)).toString();
+ Object hashValueFieldValue =
element.getField(fields.indexOf(hashValueField));
+ hashValue = hashValueFieldValue == null ? "" :
hashValueFieldValue.toString();
} else {
hashValue = hashValueField;
}
@@ -171,7 +175,8 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
return null;
}
if (fields.contains(valueField)) {
- return element.getField(fields.indexOf(valueField)).toString();
+ Object fieldValue = element.getField(fields.indexOf(valueField));
+ return fieldValue == null ? "" : fieldValue.toString();
}
return valueField;
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index efd9d8df44..96ac20cbe6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -501,5 +501,81 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
jedis.del("zset_check");
}
+ @TestTemplate
+ public void testFakeToRedisNormalKeyIsNullTest(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/fake-to-redis-test-normal-key-is-null.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ int count = 0;
+ String data = jedis.get("");
+ if (data != null) {
+ count++;
+ jedis.del("");
+ }
+ for (int i = 2; i <= 3; i++) {
+ data = jedis.get("NEW" + i);
+ if (data != null) {
+ count++;
+ jedis.del("NEW" + i);
+ }
+ }
+ Assertions.assertEquals(2, count);
+ }
+
+ @TestTemplate
+ public void testFakeToRedisCustomKeyIsNullTest(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/fake-to-redis-test-custom-key-is-null.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ int count = 0;
+ String data = jedis.get("key_check:");
+ if (data != null) {
+ count++;
+ jedis.del("key_check:");
+ }
+ for (int i = 2; i <= 3; i++) {
+ data = jedis.get("key_check:NEW" + i);
+ if (data != null) {
+ count++;
+ jedis.del("key_check:NEW" + i);
+ }
+ }
+ Assertions.assertEquals(2, count);
+ }
+
+ @TestTemplate
+ public void testFakeToRedisOtherTypeValueIsNullTest(TestContainer
container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob(
+
"/fake-to-redis-test-custom-value-when-other-type-is-null.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(2, jedis.llen("list_check"));
+ jedis.del("list_check");
+ }
+
+ @TestTemplate
+ public void testFakeToRedisHashTypeKeyIsNullTest(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/fake-to-redis-test-custom-value-when-hash-key-is-null.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(2, jedis.hlen("hash_check"));
+ jedis.del("hash_check");
+ }
+
+ @TestTemplate
+ public void testFakeToRedisHashTypeValueIsNullTest(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob(
+
"/fake-to-redis-test-custom-value-when-hash-value-is-null.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(2, jedis.hlen("hash_check"));
+ jedis.del("hash_check");
+ }
+
public abstract RedisContainerInfo getRedisContainerInfo();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
new file mode 100644
index 0000000000..20ec18450a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "key_check:{val_string}"
+ data_type = key
+ support_custom_key = true
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
new file mode 100644
index 0000000000..774b4aa379
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "hash_check"
+ data_type = hash
+ hash_key_field = "val_string"
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
new file mode 100644
index 0000000000..8d3c2fee7a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "hash_check"
+ data_type = hash
+ hash_key_field = "id"
+ hash_value_field = "val_string"
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
new file mode 100644
index 0000000000..1eab8030d9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "list_check"
+ data_type = list
+ value_field = "val_string"
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-normal-key-is-null.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-normal-key-is-null.conf
new file mode 100644
index 0000000000..b9123c294a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-normal-key-is-null.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "val_string"
+ data_type = key
+ batch_size = 33
+ }
+}
\ No newline at end of file