This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new edb14108a [Bug] [seatunnel-connectors-flink-kafka] When json or avro 
are selected for kafka schema, a ClassCastException error is reported  (#3243)
edb14108a is described below

commit edb14108a6a6117d80da378b81702db4cc965306
Author: brave kong <[email protected]>
AuthorDate: Fri Nov 4 10:59:15 2022 +0800

    [Bug] [seatunnel-connectors-flink-kafka] When json or avro are selected for 
kafka schema, a ClassCastException error is reported  (#3243)
---
 .../org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index f962fee4d..111d4be5b 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.flink.util.TableUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
@@ -110,7 +111,12 @@ public class KafkaTableStream implements FlinkStreamSource 
{
         }
         String schemaContent = config.getString(SCHEMA);
         format = 
FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase());
-        schemaInfo = JsonUtils.parseArray(schemaContent);
+        try {
+            schemaInfo = JsonUtils.stringToJsonNode(schemaContent);
+        } catch (JsonProcessingException e) {
+            log.error("schema parse error", e);
+            throw new ClassCastException(String.format("%s, cannot be cast to 
com.fasterxml.jackson.databind.JsonNode.", schemaContent));
+        }
     }
 
     @Override

Reply via email to