This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4fe932341 [Improve][Connector-V2][Pulsar] Unified exception for Pulsar 
source &… (#3590)
4fe932341 is described below

commit 4fe9323419e52af60dea6a9b17f8cc8cd6c0d499
Author: john <[email protected]>
AuthorDate: Sat Dec 3 21:29:07 2022 +0800

    [Improve][Connector-V2][Pulsar] Unified exception for Pulsar source &… 
(#3590)
    
    * [Improve][Connector-V2][Pulsar] Unified exception for Pulsar source and 
sink
---
 .../connector-v2/Error-Quick-Reference-Manual.md   | 14 ++++++-
 .../seatunnel/pulsar/config/PulsarConfigUtil.java  | 11 +++--
 .../pulsar/exception/PulsarConnectorErrorCode.java | 48 ++++++++++++++++++++++
 .../pulsar/exception/PulsarConnectorException.java | 35 ++++++++++++++++
 .../seatunnel/pulsar/source/PulsarSource.java      | 20 ++++-----
 .../source/enumerator/PulsarSplitEnumerator.java   |  6 ++-
 .../cursor/start/SubscriptionStartCursor.java      |  4 +-
 .../cursor/stop/LatestMessageStopCursor.java       |  4 +-
 .../enumerator/discoverer/TopicListDiscoverer.java |  4 +-
 .../discoverer/TopicPatternDiscoverer.java         |  6 ++-
 .../pulsar/source/reader/PulsarSourceReader.java   |  8 ++--
 .../source/reader/PulsarSplitReaderThread.java     |  4 +-
 12 files changed, 138 insertions(+), 26 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 4ed61a0df..4b5ef6b10 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -168,4 +168,16 @@ problems encountered by users.
 | CLICKHOUSE-03 | Can’t delete directory                                       
             | When users encounter this error code, it means that the 
directory does not exist or does not have permission, please check              
                                |
 | CLICKHOUSE-04 | Ssh operation failed, such as 
(login,connect,authentication,close) etc... | When users encounter this error 
code, it means that the ssh request failed, please check your network 
environment                                                       |
 | CLICKHOUSE-05 | Get cluster list from clickhouse failed                      
             | When users encounter this error code, it means that the 
clickhouse cluster is not configured correctly, please check                    
                                |
-| CLICKHOUSE-06 | Shard key not found in table                                 
             | When users encounter this error code, it means that the shard 
key of the distributed table is not configured, please check                    
                          |
\ No newline at end of file
+| CLICKHOUSE-06 | Shard key not found in table                                 
             | When users encounter this error code, it means that the shard 
key of the distributed table is not configured, please check                    
                          |
+
+## Pulsar Connector Error Codes
+
+| code         | description                                                  
| solution                                                                      
                                                                                
        |
+|--------------|--------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| PULSAR-01 | Open pulsar admin failed                                        
| When users encounter this error code, it means that open pulsar admin failed, 
please check it                                        |
+| PULSAR-02 | Open pulsar client failed                                       
| When users encounter this error code, it means that open pulsar client 
failed, please check it                                       |
+| PULSAR-03 | Pulsar authentication failed                                    
| When users encounter this error code, it means that Pulsar Authentication 
failed, please check it                                    |
+| PULSAR-04 | Subscribe topic from pulsar failed                              
| When users encounter this error code, it means that Subscribe topic from 
pulsar failed, please check it                              |
+| PULSAR-05 | Get last cursor of pulsar topic failed                          
| When users encounter this error code, it means that get last cursor of pulsar 
topic failed, please check it                          |
+| PULSAR-06 | Get partition information of pulsar topic failed                
| When users encounter this error code, it means that Get partition information 
of pulsar topic failed, please check it                |
+
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
index 85703ad56..287d38e45 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
 
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.Authentication;
@@ -43,7 +46,7 @@ public class PulsarConfigUtil {
         try {
             return builder.build();
         } catch (PulsarClientException e) {
-            throw new RuntimeException(e);
+            throw new 
PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_ADMIN_FAILED, e);
         }
     }
 
@@ -54,7 +57,7 @@ public class PulsarConfigUtil {
         try {
             return builder.build();
         } catch (PulsarClientException e) {
-            throw new RuntimeException(e);
+            throw new 
PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_CLIENT_FAILED, e);
         }
     }
 
@@ -74,10 +77,10 @@ public class PulsarConfigUtil {
             try {
                 return 
AuthenticationFactory.create(config.getAuthPluginClassName(), 
config.getAuthParams());
             } catch (PulsarClientException.UnsupportedAuthenticationException 
e) {
-                throw new RuntimeException("Failed to create the 
authentication plug-in.", e);
+                throw new 
PulsarConnectorException(PulsarConnectorErrorCode.PULSAR_AUTHENTICATION_FAILED, 
e);
             }
         } else {
-            throw new IllegalArgumentException("Authentication parameters are 
required when using authentication plug-in.");
+            throw new 
PulsarConnectorException(PulsarConnectorErrorCode.PULSAR_AUTHENTICATION_FAILED, 
"Authentication parameters are required when using authentication plug-in.");
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java
new file mode 100644
index 000000000..513e5dac6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum PulsarConnectorErrorCode implements SeaTunnelErrorCode {
+
+    OPEN_PULSAR_ADMIN_FAILED("PULSAR-01", "Open pulsar admin failed"),
+    OPEN_PULSAR_CLIENT_FAILED("PULSAR-02", "Open pulsar client failed"),
+    PULSAR_AUTHENTICATION_FAILED("PULSAR-03", "Pulsar authentication failed"),
+    SUBSCRIBE_TOPIC_FAILED("PULSAR-04", "Subscribe topic from pulsar failed"),
+    GET_LAST_CURSOR_FAILED("PULSAR-05", "Get last cursor of pulsar topic 
failed"),
+    GET_TOPIC_PARTITION_FAILED("PULSAR-06", "Get partition information of 
pulsar topic failed");
+
+    private final String code;
+    private final String description;
+
+    PulsarConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorException.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorException.java
new file mode 100644
index 000000000..687d32896
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class PulsarConnectorException extends SeaTunnelRuntimeException {
+    public PulsarConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public PulsarConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public PulsarConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index 769e1143a..fe35f4bac 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -33,13 +33,12 @@ import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProp
 import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
 import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode.LATEST;
-import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StopMode.NEVER;
 import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
 import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL;
 import static 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -56,6 +55,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfi
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumerator;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumeratorState;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
@@ -103,7 +103,7 @@ public class PulsarSource<T> implements SeaTunnelSource<T, 
PulsarPartitionSplit,
     public void prepare(Config config) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(config, 
SUBSCRIPTION_NAME.key(), CLIENT_SERVICE_URL.key(), ADMIN_SERVICE_URL.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, 
String.format("PluginName: %s, PluginType: %s, Message: %s", getPluginName(), 
PluginType.SOURCE, result.getMsg()));
         }
 
         // admin config
@@ -155,7 +155,7 @@ public class PulsarSource<T> implements SeaTunnelSource<T, 
PulsarPartitionSplit,
         if (partitionDiscoverer instanceof TopicPatternDiscoverer
             && partitionDiscoveryIntervalMs > 0
             && Boundedness.BOUNDED == stopCursor.getBoundedness()) {
-            throw new IllegalArgumentException("Bounded streams do not support 
dynamic partition discovery.");
+            throw new 
PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, 
"Bounded streams do not support dynamic partition discovery.");
         }
     }
 
@@ -177,12 +177,12 @@ public class PulsarSource<T> implements 
SeaTunnelSource<T, PulsarPartitionSplit,
                 break;
             case TIMESTAMP:
                 if 
(StringUtils.isBlank(config.getString(CURSOR_STARTUP_TIMESTAMP.key()))) {
-                    throw new IllegalArgumentException(String.format("The '%s' 
property is required when the '%s' is 'timestamp'.", 
CURSOR_STARTUP_TIMESTAMP.key(), CURSOR_STARTUP_MODE.key()));
+                    throw new 
PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, 
String.format("The '%s' property is required when the '%s' is 'timestamp'.", 
CURSOR_STARTUP_TIMESTAMP.key(), CURSOR_STARTUP_MODE.key()));
                 }
                 setOption(config, CURSOR_STARTUP_TIMESTAMP.key(), 
config::getLong, timestamp -> this.startCursor = 
StartCursor.timestamp(timestamp));
                 break;
             default:
-                throw new IllegalArgumentException(String.format("The %s mode 
is not supported.", startMode));
+                throw new 
PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, 
String.format("The %s mode is not supported.", startMode));
         }
     }
 
@@ -197,12 +197,12 @@ public class PulsarSource<T> implements 
SeaTunnelSource<T, PulsarPartitionSplit,
                 break;
             case TIMESTAMP:
                 if 
(StringUtils.isBlank(config.getString(CURSOR_STOP_TIMESTAMP.key()))) {
-                    throw new IllegalArgumentException(String.format("The '%s' 
property is required when the '%s' is 'timestamp'.", 
CURSOR_STOP_TIMESTAMP.key(), CURSOR_STOP_MODE.key()));
+                    throw new 
PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, 
String.format("The '%s' property is required when the '%s' is 'timestamp'.", 
CURSOR_STOP_TIMESTAMP.key(), CURSOR_STOP_MODE.key()));
                 }
                 setOption(config, CURSOR_STARTUP_TIMESTAMP.key(), 
config::getLong, timestamp -> this.stopCursor = 
StopCursor.timestamp(timestamp));
                 break;
             default:
-                throw new IllegalArgumentException(String.format("The %s mode 
is not supported.", stopMode));
+                throw new 
PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, 
String.format("The %s mode is not supported.", stopMode));
         }
     }
 
@@ -214,12 +214,12 @@ public class PulsarSource<T> implements 
SeaTunnelSource<T, PulsarPartitionSplit,
         String topicPattern = config.getString(TOPIC_PATTERN.key());
         if (StringUtils.isNotBlank(topicPattern)) {
             if (this.partitionDiscoverer != null) {
-                throw new IllegalArgumentException(String.format("The 
properties '%s' and '%s' is exclusive.", TOPIC.key(), TOPIC_PATTERN.key()));
+                throw new 
PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, 
String.format("The properties '%s' and '%s' is exclusive.", TOPIC.key(), 
TOPIC_PATTERN.key()));
             }
             this.partitionDiscoverer = new 
TopicPatternDiscoverer(Pattern.compile(topicPattern));
         }
         if (this.partitionDiscoverer == null) {
-            throw new IllegalArgumentException(String.format("The properties 
'%s' or '%s' is required.", TOPIC.key(), TOPIC_PATTERN.key()));
+            throw new 
PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, 
String.format("The properties '%s' or '%s' is required.", TOPIC.key(), 
TOPIC_PATTERN.key()));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
index 8b7f6f8dd..c9d8a0d0e 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
@@ -19,8 +19,10 @@ package 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator;
 
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.LatestMessageStopCursor;
@@ -107,10 +109,10 @@ public class PulsarSplitEnumerator implements 
SourceSplitEnumerator<PulsarPartit
                                  StopCursor stopCursor,
                                  String subscriptionName,
                                  Set<TopicPartition> assignedPartitions) {
-        if ((partitionDiscoverer instanceof TopicPatternDiscoverer)
+        if (partitionDiscoverer instanceof TopicPatternDiscoverer
             && partitionDiscoveryIntervalMs > 0
             && Boundedness.BOUNDED == stopCursor.getBoundedness()) {
-            throw new IllegalArgumentException("Bounded streams do not support 
dynamic partition discovery.");
+            throw new 
PulsarConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, "Bounded 
streams do not support dynamic partition discovery.");
         }
         this.context = context;
         this.adminConfig = adminConfig;
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
index e62d24079..a6c053c4f 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
@@ -18,6 +18,8 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start;
 
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -48,7 +50,7 @@ public class SubscriptionStartCursor implements StartCursor {
             }
             
pulsarAdmin.topics().createSubscription(partition.getFullTopicName(), 
subscription, CursorResetStrategy.EARLIEST == cursorResetStrategy ? 
MessageId.earliest : MessageId.latest);
         } catch (PulsarAdminException e) {
-            throw new RuntimeException(e);
+            throw new 
PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_ADMIN_FAILED, e);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
index 14aeeff28..51a7fdb5d 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
@@ -18,6 +18,8 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop;
 
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -41,7 +43,7 @@ public class LatestMessageStopCursor implements StopCursor {
             try {
                 messageId = admin.topics().getLastMessageId(topic);
             } catch (PulsarAdminException e) {
-                throw new RuntimeException("Failed to get the last cursor", e);
+                throw new 
PulsarConnectorException(PulsarConnectorErrorCode.GET_LAST_CURSOR_FAILED, 
"Failed to get the last cursor", e);
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java
index b362446d0..3dcf042e6 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java
@@ -18,6 +18,8 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer;
 
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -53,7 +55,7 @@ public class TopicListDiscoverer implements PulsarDiscoverer {
                     return PulsarDiscoverer.toTopicPartitions(topicName, 
metadata.partitions);
                 } catch (PulsarAdminException e) {
                     // This method would cause the failure for subscriber.
-                    throw new IllegalStateException(e);
+                    throw new 
PulsarConnectorException(PulsarConnectorErrorCode.SUBSCRIBE_TOPIC_FAILED, e);
                 }
             })
             .filter(Objects::nonNull)
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicPatternDiscoverer.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicPatternDiscoverer.java
index d5eaedac5..d6a22cdea 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicPatternDiscoverer.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicPatternDiscoverer.java
@@ -17,6 +17,8 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer;
 
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -71,14 +73,14 @@ public class TopicPatternDiscoverer implements 
PulsarDiscoverer {
                         return PulsarDiscoverer.toTopicPartitions(topicName, 
metadata.partitions);
                     } catch (PulsarAdminException e) {
                         // This method would cause the failure for subscriber.
-                        throw new IllegalStateException(e);
+                        throw new 
PulsarConnectorException(PulsarConnectorErrorCode.GET_TOPIC_PARTITION_FAILED, 
e);
                     }
                 }).filter(Objects::nonNull)
                 .flatMap(Collection::stream)
                 .collect(Collectors.toSet());
         } catch (PulsarAdminException e) {
             // This method would cause the failure for subscriber.
-            throw new IllegalStateException(e);
+            throw new 
PulsarConnectorException(PulsarConnectorErrorCode.GET_TOPIC_PARTITION_FAILED, 
e);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
index ddb2e9147..af3cf5120 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
@@ -21,9 +21,11 @@ import 
org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.common.Handover;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
 
@@ -113,7 +115,7 @@ public class PulsarSourceReader<T> implements 
SourceReader<T, PulsarPartitionSpl
             try {
                 reader.close();
             } catch (IOException e) {
-                throw new RuntimeException("Failed to close the split reader 
thread.", e);
+                throw new 
PulsarConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "Failed to 
close the split reader thread.", e);
             }
         });
     }
@@ -166,7 +168,7 @@ public class PulsarSourceReader<T> implements 
SourceReader<T, PulsarPartitionSpl
                 splitReaders.put(split.splitId(), splitReaderThread);
                 splitReaderThread.start();
             } catch (PulsarClientException e) {
-                throw new RuntimeException("Failed to start the split reader 
thread.", e);
+                throw new 
PulsarConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "Failed to 
start the split reader thread.", e);
             }
         });
     }
@@ -216,7 +218,7 @@ public class PulsarSourceReader<T> implements 
SourceReader<T, PulsarPartitionSpl
                 try {
                     splitReaders.get(splitId).close();
                 } catch (IOException e) {
-                    throw new RuntimeException("Failed to close the split 
reader thread.", e);
+                    throw new 
PulsarConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "Failed to 
close the split reader thread.", e);
                 }
             }
         });
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
index 9817046b7..0fe1e5e26 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
@@ -20,6 +20,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;
 import org.apache.seatunnel.common.Handover;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
@@ -137,7 +139,7 @@ public class PulsarSplitReaderThread extends Thread 
implements Closeable {
         try {
             return consumerBuilder.subscribe();
         } catch (PulsarClientException e) {
-            throw new RuntimeException("Failed to create pulsar consumer:", e);
+            throw new 
PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_ADMIN_FAILED, 
"Failed to create pulsar consumer:", e);
         }
     }
 

Reply via email to