This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3b432125ae [Fix][Connector-V2] Fix cdc use default value when value is
null (#7950)
3b432125ae is described below
commit 3b432125ae2227b08308c9933f8708e2049d9b81
Author: Jia Fan <[email protected]>
AuthorDate: Thu Oct 31 13:51:20 2024 +0800
[Fix][Connector-V2] Fix cdc use default value when value is null (#7950)
---
...TunnelRowDebeziumDeserializationConverters.java | 4 +-
...elRowDebeziumDeserializationConvertersTest.java | 78 ++++++++++++++++++++++
.../seatunnel/prometheus/sink/PrometheusSink.java | 17 +++--
.../prometheus/sink/PrometheusSinkFactory.java | 2 +-
4 files changed, 93 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
index 0a2fb09cf8..89b9c50c30 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
@@ -85,7 +85,7 @@ public class SeaTunnelRowDebeziumDeserializationConverters
implements Serializab
if (field == null) {
row.setField(i, null);
} else {
- Object fieldValue = struct.get(fieldName);
+ Object fieldValue = struct.getWithoutDefault(fieldName);
Schema fieldSchema = field.schema();
Object convertedField =
SeaTunnelRowDebeziumDeserializationConverters.convertField(
@@ -494,11 +494,11 @@ public class
SeaTunnelRowDebeziumDeserializationConverters implements Serializab
SeaTunnelRow row = new SeaTunnelRow(arity);
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
- Object fieldValue = struct.get(fieldName);
Field field = schema.field(fieldName);
if (field == null) {
row.setField(i, null);
} else {
+ Object fieldValue =
struct.getWithoutDefault(fieldName);
Schema fieldSchema = field.schema();
Object convertedField =
SeaTunnelRowDebeziumDeserializationConverters.convertField(
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
new file mode 100644
index 0000000000..74e832d6e0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium.row;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
+import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class SeaTunnelRowDebeziumDeserializationConvertersTest {
+
+ @Test
+ void testDefaultValueNotUsed() throws Exception {
+ SeaTunnelRowDebeziumDeserializationConverters converters =
+ new SeaTunnelRowDebeziumDeserializationConverters(
+ new SeaTunnelRowType(
+ new String[] {"id", "name"},
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE, BasicType.STRING_TYPE
+ }),
+ new MetadataConverter[] {},
+ ZoneId.systemDefault(),
+ DebeziumDeserializationConverterFactory.DEFAULT);
+ Schema schema =
+ SchemaBuilder.struct()
+ .field("id", SchemaBuilder.int32().build())
+ .field("name",
SchemaBuilder.string().defaultValue("UL"))
+ .build();
+ Struct value = new Struct(schema);
+ // the value of `name` is null, so do not put value for it
+ value.put("id", 1);
+ SourceRecord record =
+ new SourceRecord(
+ new HashMap<>(),
+ new HashMap<>(),
+ "topicName",
+ null,
+ SchemaBuilder.int32().build(),
+ 1,
+ schema,
+ value,
+ null,
+ new ArrayList<>());
+
+ SeaTunnelRow row = converters.convert(record, value, schema);
+ Assertions.assertEquals(row.getField(0), 1);
+ Assertions.assertNull(row.getField(1));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java
b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java
index 93d4e931e1..35ec257fc9 100644
---
a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java
+++
b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java
@@ -19,8 +19,8 @@ package
org.apache.seatunnel.connectors.seatunnel.prometheus.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
@@ -28,15 +28,16 @@ import
org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
public class PrometheusSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
protected final HttpParameter httpParameter = new HttpParameter();
- protected SeaTunnelRowType seaTunnelRowType;
+ protected CatalogTable catalogTable;
protected ReadonlyConfig pluginConfig;
- public PrometheusSink(ReadonlyConfig pluginConfig, SeaTunnelRowType
rowType) {
+ public PrometheusSink(ReadonlyConfig pluginConfig, CatalogTable
catalogTable) {
this.pluginConfig = pluginConfig;
httpParameter.setUrl(pluginConfig.get(HttpConfig.URL));
if (pluginConfig.getOptional(HttpConfig.HEADERS).isPresent()) {
@@ -45,7 +46,7 @@ public class PrometheusSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
if (pluginConfig.getOptional(HttpConfig.PARAMS).isPresent()) {
httpParameter.setHeaders(pluginConfig.get(HttpConfig.PARAMS));
}
- this.seaTunnelRowType = rowType;
+ this.catalogTable = catalogTable;
if (Objects.isNull(httpParameter.getHeaders())) {
Map<String, String> headers = new HashMap<>();
@@ -67,6 +68,12 @@ public class PrometheusSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
@Override
public PrometheusWriter createWriter(SinkWriter.Context context) {
- return new PrometheusWriter(seaTunnelRowType, httpParameter,
pluginConfig);
+ return new PrometheusWriter(
+ catalogTable.getSeaTunnelRowType(), httpParameter,
pluginConfig);
+ }
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSinkFactory.java
b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSinkFactory.java
index dcd8e72c1a..544f17c9a6 100644
---
a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSinkFactory.java
@@ -39,7 +39,7 @@ public class PrometheusSinkFactory extends HttpSinkFactory {
ReadonlyConfig readonlyConfig = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
- return () -> new PrometheusSink(readonlyConfig,
catalogTable.getSeaTunnelRowType());
+ return () -> new PrometheusSink(readonlyConfig, catalogTable);
}
@Override