This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch 2.1.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/2.1.4-prepare by this push:
new 5c7b1f6c8 [Hotfix][Connector-V1][Kafka] When json or avro are selected
for kafka schema, a ClassCastException error is reported (#3423)
5c7b1f6c8 is described below
commit 5c7b1f6c821febd740917268a612430a5e634c0d
Author: Kirs <[email protected]>
AuthorDate: Mon Nov 14 15:24:13 2022 +0800
[Hotfix][Connector-V1][Kafka] When json or avro are selected for kafka
schema, a ClassCastException error is reported (#3423)
* [Bug] [seatunnel-connectors-flink-kafka] When json or avro are selected
for kafka schema, a ClassCastException error is reported (#3243)
* Fix logger miss
Co-authored-by: brave kong <[email protected]>
---
.../org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index dbb60c7cf..2ff6ad98d 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.flink.util.TableUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.auto.service.AutoService;
import org.apache.commons.lang3.StringUtils;
@@ -111,7 +112,12 @@ public class KafkaTableStream implements FlinkStreamSource
{
}
String schemaContent = config.getString(SCHEMA);
format =
FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase());
- schemaInfo = JsonUtils.parseArray(schemaContent);
+ try {
+ schemaInfo = JsonUtils.stringToJsonNode(schemaContent);
+ } catch (JsonProcessingException e) {
+ LOGGER.error("schema parse error", e);
+ throw new ClassCastException(String.format("%s, cannot be cast to
com.fasterxml.jackson.databind.JsonNode.", schemaContent));
+ }
}
@Override