This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fea4b6f268 [Fix] Fix log error when multi-table sink close (#5683)
fea4b6f268 is described below

commit fea4b6f268e2229bcd31fae02dc684cdef8fe259
Author: Jia Fan <[email protected]>
AuthorDate: Wed Oct 25 18:10:07 2023 +0800

    [Fix] Fix log error when multi-table sink close (#5683)
---
 .../seatunnel/assertion/sink/AssertSink.java       |  4 +-
 .../seatunnel/assertion/sink/AssertSinkWriter.java |  4 +-
 .../multitablesink/MultiTableSinkWriter.java       | 17 +++++
 .../multitablesink/MultiTableWriterRunnable.java   |  5 ++
 .../e2e/connector/fake/FakeWithMultiTableTT.java   | 12 +++
 .../fake_to_assert_with_multitable_exception.conf  | 85 ++++++++++++++++++++++
 .../connectors/seatunnel/jdbc/JdbcPostgresIT.java  |  8 +-
 .../e2e/common/container/TestContainer.java        |  2 +
 .../flink/AbstractTestFlinkContainer.java          |  5 ++
 .../container/seatunnel/SeaTunnelContainer.java    |  5 ++
 .../spark/AbstractTestSparkContainer.java          |  5 ++
 11 files changed, 143 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index 97106bea02..d65398c9de 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -50,7 +51,8 @@ import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertCon
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
 
 @AutoService(SeaTunnelSink.class)
-public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+        implements SupportMultiTableSink {
     private SeaTunnelRowType seaTunnelRowType;
     private CatalogTable catalogTable;
     private List<AssertFieldRule> assertFieldRules;
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
index c0bc0d9718..7d7968ad00 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
 
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.excecutor.AssertExecutor;
@@ -33,7 +34,8 @@ import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.LongAccumulator;
 
-public class AssertSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class AssertSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+        implements SupportMultiTableSinkWriter<Void> {
 
     private final SeaTunnelRowType seaTunnelRowType;
     private final List<AssertFieldRule> assertFieldRules;
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
index 765f58785c..fad9c2ffa9 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
@@ -156,6 +156,7 @@ public class MultiTableSinkWriter
 
     @Override
     public List<MultiTableState> snapshotState(long checkpointId) throws 
IOException {
+        checkQueueRemain();
         subSinkErrorCheck();
         List<MultiTableState> multiTableStates = new ArrayList<>();
         MultiTableState multiTableState = new MultiTableState(new HashMap<>());
@@ -174,6 +175,7 @@ public class MultiTableSinkWriter
 
     @Override
     public Optional<MultiTableCommitInfo> prepareCommit() throws IOException {
+        checkQueueRemain();
         subSinkErrorCheck();
         MultiTableCommitInfo multiTableCommitInfo = new 
MultiTableCommitInfo(new HashMap<>());
         for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
@@ -194,6 +196,7 @@ public class MultiTableSinkWriter
 
     @Override
     public void abortPrepare() {
+        checkQueueRemain();
         Throwable firstE = null;
         for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
             synchronized (runnable.get(i)) {
@@ -217,6 +220,7 @@ public class MultiTableSinkWriter
 
     @Override
     public void close() throws IOException {
+        checkQueueRemain();
         executorService.shutdownNow();
         Throwable firstE = null;
         for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
@@ -243,4 +247,17 @@ public class MultiTableSinkWriter
             throw new RuntimeException(firstE);
         }
     }
+
+    private void checkQueueRemain() {
+        try {
+            for (BlockingQueue<SeaTunnelRow> blockingQueue : blockingQueues) {
+                while (!blockingQueue.isEmpty()) {
+                    Thread.sleep(100);
+                    subSinkErrorCheck();
+                }
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
index f5895c374e..1fa681f0fa 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
@@ -61,6 +61,11 @@ public class MultiTableWriterRunnable implements Runnable {
                 synchronized (this) {
                     writer.write(row);
                 }
+            } catch (InterruptedException e) {
+                // When the job finished, the thread will be interrupted, so 
we ignore this
+                // exception.
+                throwable = e;
+                break;
             } catch (Exception e) {
                 log.error("MultiTableWriterRunnable error", e);
                 throwable = e;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithMultiTableTT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithMultiTableTT.java
index 1f41385b5a..2825f9d7da 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithMultiTableTT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithMultiTableTT.java
@@ -38,6 +38,18 @@ public class FakeWithMultiTableTT extends TestSuiteBase {
             throws IOException, InterruptedException {
         Container.ExecResult fakeWithTableNames =
                 
container.executeJob("/fake_to_console_with_multitable_mode.conf");
+        Assertions.assertFalse(
+                container.getServerLogs().contains("MultiTableWriterRunnable 
error"));
         Assertions.assertEquals(0, fakeWithTableNames.getExitCode());
+
+        Container.ExecResult fakeWithException =
+                
container.executeJob("/fake_to_assert_with_multitable_exception.conf");
+        
Assertions.assertTrue(container.getServerLogs().contains("MultiTableWriterRunnable
 error"));
+        Assertions.assertTrue(
+                container
+                        .getServerLogs()
+                        .contains(
+                                "at 
org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.checkQueueRemain(MultiTableSinkWriter.java"));
+        Assertions.assertEquals(1, fakeWithException.getExitCode());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_multitable_exception.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_multitable_exception.conf
new file mode 100644
index 0000000000..d4af80d024
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_multitable_exception.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 200]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      field_rules = [
+        {
+          field_name = score
+          field_type = int
+          field_value = [
+            {
+              rule_type = MAX
+              rule_value = 100
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 075e6780e4..bf2046ed26 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -50,8 +50,6 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -255,11 +253,7 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
         for (String CONFIG_FILE : PG_CONFIG_FILE_LIST) {
             Container.ExecResult execResult = 
container.executeJob(CONFIG_FILE);
             Assertions.assertEquals(0, execResult.getExitCode());
-            Assertions.assertTimeout(
-                    Duration.of(10, ChronoUnit.SECONDS),
-                    () ->
-                            Assertions.assertIterableEquals(
-                                    querySql(SOURCE_SQL), querySql(SINK_SQL)));
+            Assertions.assertIterableEquals(querySql(SOURCE_SQL), 
querySql(SINK_SQL));
             executeSQL("truncate table pg_e2e_sink_table");
             log.info(CONFIG_FILE + " e2e test completed");
         }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index fbe97b6d86..09924ef3f9 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -34,4 +34,6 @@ public interface TestContainer extends TestResource {
             throws IOException, InterruptedException;
 
     Container.ExecResult executeJob(String confFile) throws IOException, 
InterruptedException;
+
+    String getServerLogs();
 }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index e62e5ab338..a630f243a1 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -139,4 +139,9 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
         log.info("test in container: {}", identifier());
         return executeJob(jobManager, confFile);
     }
+
+    @Override
+    public String getServerLogs() {
+        return jobManager.getLogs();
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 8d1de95654..1909518838 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -143,4 +143,9 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
         log.info("test in container: {}", identifier());
         return executeJob(server, confFile);
     }
+
+    @Override
+    public String getServerLogs() {
+        return server.getLogs();
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index 88155f47fd..7d0a416dcc 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -96,4 +96,9 @@ public abstract class AbstractTestSparkContainer extends 
AbstractTestContainer {
         log.info("test in container: {}", identifier());
         return executeJob(master, confFile);
     }
+
+    @Override
+    public String getServerLogs() {
+        return master.getLogs();
+    }
 }

Reply via email to