This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d757dcd1fc [Improve][Connector-V2][Common] Remove assert key word.
(#5915)
d757dcd1fc is described below
commit d757dcd1fc8894f92b65ceee70d2e2d98d9ed358
Author: Chengyu Yan <[email protected]>
AuthorDate: Sat Nov 25 17:58:01 2023 +0800
[Improve][Connector-V2][Common] Remove assert key word. (#5915)
---
release-note.md | 1 +
.../common/source/reader/fetcher/SplitFetcher.java | 21 ++++++++++++++++++---
2 files changed, 19 insertions(+), 3 deletions(-)
diff --git a/release-note.md b/release-note.md
index c9b9269574..9282dd97b7 100644
--- a/release-note.md
+++ b/release-note.md
@@ -119,6 +119,7 @@
- [Connector-v2] [Jdbc] Populate primary key when jdbc sink is created using
CatalogTable (#4755)
- [Connector-v2] [Neo4j] Supports neo4j sink batch write mode (#4835)
- [Transform-V2] Optimize SQL Transform package and Fix Spark type conversion
bug of transform (#4490)
+- [Connector-V2] [Common] Remove assert key word (#5915)
### CI
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcher.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcher.java
index e3f691aa4f..3609aeb86f 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcher.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcher.java
@@ -179,7 +179,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit>
implements Runnable {
}
private SplitFetcherTask getNextTaskUnsafe() {
- assert lock.isHeldByCurrentThread();
+ if (!lock.isHeldByCurrentThread()) {
+ throw new RuntimeException(
+ String.format(
+ "Unsafe invoke, the current thread[%s] has not
acquired the lock[%s].",
+ Thread.currentThread().getName(),
this.lock.toString()));
+ }
try {
if (!taskQueue.isEmpty()) {
@@ -201,7 +206,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit>
implements Runnable {
}
private void wakeUpUnsafe(boolean taskOnly) {
- assert lock.isHeldByCurrentThread();
+ if (!lock.isHeldByCurrentThread()) {
+ throw new RuntimeException(
+ String.format(
+ "Unsafe invoke, the current thread[%s] has not
acquired the lock[%s].",
+ Thread.currentThread().getName(),
this.lock.toString()));
+ }
SplitFetcherTask currentTask = runningTask;
if (currentTask != null) {
@@ -214,7 +224,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit>
implements Runnable {
}
private void addTaskUnsafe(SplitFetcherTask task) {
- assert lock.isHeldByCurrentThread();
+ if (!lock.isHeldByCurrentThread()) {
+ throw new RuntimeException(
+ String.format(
+ "Unsafe invoke, the current thread[%s] has not
acquired the lock[%s].",
+ Thread.currentThread().getName(),
this.lock.toString()));
+ }
taskQueue.add(task);
nonEmpty.signal();