This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 854d108ee [flink] Add exception stack for log store register (#2226)
854d108ee is described below

commit 854d108ee4cc558dcf1a8316ceefe83cc71d73ea
Author: Fang Yong <[email protected]>
AuthorDate: Thu Nov 2 13:46:30 2023 +0800

    [flink] Add exception stack for log store register (#2226)
---
 .../paimon/flink/kafka/KafkaLogStoreRegister.java  | 25 +++++++++++++---------
 .../flink/kafka/KafkaLogStoreRegisterITCase.java   | 11 +++++-----
 2 files changed, 21 insertions(+), 15 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
index cbff022a3..c93889b66 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
@@ -111,13 +111,15 @@ public class KafkaLogStoreRegister implements 
LogStoreRegister {
         } catch (TimeoutException e) {
             throw new IllegalStateException(
                     String.format(
-                            "Register topic for table %s timeout %s",
-                            this.identifier.getFullName(), e.getMessage()));
+                            "Register topic for table %s timeout with 
properties %s",
+                            this.identifier.getFullName(), properties),
+                    e);
         } catch (Exception e) {
             throw new IllegalStateException(
                     String.format(
-                            "Register topic for table %s exception %s",
-                            this.identifier.getFullName(), e.getMessage()));
+                            "Register topic for table %s failed with 
properties %s",
+                            this.identifier.getFullName(), properties),
+                    e);
         }
 
         return ImmutableMap.of(
@@ -143,19 +145,22 @@ public class KafkaLogStoreRegister implements 
LogStoreRegister {
             } else {
                 throw new IllegalStateException(
                         String.format(
-                                "Unregister topic for table %s exception %s",
-                                this.identifier.getFullName(), 
e.getMessage()));
+                                "Unregister topic for table %s failed with 
properties %s",
+                                this.identifier.getFullName(), properties),
+                        e);
             }
         } catch (TimeoutException e) {
             throw new RuntimeException(
                     String.format(
-                            "Unregister topic for table %s timeout %s",
-                            this.identifier.getFullName(), e.getMessage()));
+                            "Unregister topic for table %s timeout with 
properties %s",
+                            this.identifier.getFullName(), properties),
+                    e);
         } catch (Exception e) {
             throw new RuntimeException(
                     String.format(
-                            "Unregister topic for table %s exception %s",
-                            this.identifier.getFullName(), e.getMessage()));
+                            "Unregister topic for table %s failed with 
properties %s",
+                            this.identifier.getFullName(), properties),
+                    e);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
index 30e7d1de1..e92973f32 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
@@ -24,6 +24,8 @@ import org.apache.paimon.options.Options;
 
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
@@ -101,11 +103,10 @@ public class KafkaLogStoreRegisterITCase extends 
KafkaTableTestBase {
 
         KafkaLogStoreRegister kafkaLogStoreRegister =
                 createKafkaLogStoreRegister(invalidBootstrapServers, topic);
-
         assertThatThrownBy(kafkaLogStoreRegister::registerTopic)
                 .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining(
-                        "Register topic for table mock_db.mock_table exception 
Failed to create new KafkaAdminClient");
+                .hasMessageContaining("Register topic for table 
mock_db.mock_table failed")
+                .hasRootCauseInstanceOf(ConfigException.class);
     }
 
     @Test
@@ -118,8 +119,8 @@ public class KafkaLogStoreRegisterITCase extends 
KafkaTableTestBase {
                                 
createKafkaLogStoreRegister(getBootstrapServers(), topic)
                                         .registerTopic())
                 .isInstanceOf(IllegalStateException.class)
-                .hasMessage(
-                        "Register topic for table mock_db.mock_table exception 
org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-exist' 
already exists.");
+                .hasMessageContaining("Register topic for table 
mock_db.mock_table failed")
+                .hasRootCauseInstanceOf(TopicExistsException.class);
     }
 
     @Test

Reply via email to