This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 854d108ee [flink] Add exception stack for log store register (#2226)
854d108ee is described below
commit 854d108ee4cc558dcf1a8316ceefe83cc71d73ea
Author: Fang Yong <[email protected]>
AuthorDate: Thu Nov 2 13:46:30 2023 +0800
[flink] Add exception stack for log store register (#2226)
---
.../paimon/flink/kafka/KafkaLogStoreRegister.java | 25 +++++++++++++---------
.../flink/kafka/KafkaLogStoreRegisterITCase.java | 11 +++++-----
2 files changed, 21 insertions(+), 15 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
index cbff022a3..c93889b66 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
@@ -111,13 +111,15 @@ public class KafkaLogStoreRegister implements
LogStoreRegister {
} catch (TimeoutException e) {
throw new IllegalStateException(
String.format(
- "Register topic for table %s timeout %s",
- this.identifier.getFullName(), e.getMessage()));
+ "Register topic for table %s timeout with
properties %s",
+ this.identifier.getFullName(), properties),
+ e);
} catch (Exception e) {
throw new IllegalStateException(
String.format(
- "Register topic for table %s exception %s",
- this.identifier.getFullName(), e.getMessage()));
+ "Register topic for table %s failed with
properties %s",
+ this.identifier.getFullName(), properties),
+ e);
}
return ImmutableMap.of(
@@ -143,19 +145,22 @@ public class KafkaLogStoreRegister implements
LogStoreRegister {
} else {
throw new IllegalStateException(
String.format(
- "Unregister topic for table %s exception %s",
- this.identifier.getFullName(),
e.getMessage()));
+ "Unregister topic for table %s failed with
properties %s",
+ this.identifier.getFullName(), properties),
+ e);
}
} catch (TimeoutException e) {
throw new RuntimeException(
String.format(
- "Unregister topic for table %s timeout %s",
- this.identifier.getFullName(), e.getMessage()));
+ "Unregister topic for table %s timeout with
properties %s",
+ this.identifier.getFullName(), properties),
+ e);
} catch (Exception e) {
throw new RuntimeException(
String.format(
- "Unregister topic for table %s exception %s",
- this.identifier.getFullName(), e.getMessage()));
+ "Unregister topic for table %s failed with
properties %s",
+ this.identifier.getFullName(), properties),
+ e);
}
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
index 30e7d1de1..e92973f32 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
@@ -24,6 +24,8 @@ import org.apache.paimon.options.Options;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.TopicExistsException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -101,11 +103,10 @@ public class KafkaLogStoreRegisterITCase extends
KafkaTableTestBase {
KafkaLogStoreRegister kafkaLogStoreRegister =
createKafkaLogStoreRegister(invalidBootstrapServers, topic);
-
assertThatThrownBy(kafkaLogStoreRegister::registerTopic)
.isInstanceOf(IllegalStateException.class)
- .hasMessageContaining(
- "Register topic for table mock_db.mock_table exception
Failed to create new KafkaAdminClient");
+ .hasMessageContaining("Register topic for table
mock_db.mock_table failed")
+ .hasRootCauseInstanceOf(ConfigException.class);
}
@Test
@@ -118,8 +119,8 @@ public class KafkaLogStoreRegisterITCase extends
KafkaTableTestBase {
createKafkaLogStoreRegister(getBootstrapServers(), topic)
.registerTopic())
.isInstanceOf(IllegalStateException.class)
- .hasMessage(
- "Register topic for table mock_db.mock_table exception
org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-exist'
already exists.");
+ .hasMessageContaining("Register topic for table
mock_db.mock_table failed")
+ .hasRootCauseInstanceOf(TopicExistsException.class);
}
@Test