This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fea4b6f268 [Fix] Fix log error when multi-table sink close (#5683)
fea4b6f268 is described below
commit fea4b6f268e2229bcd31fae02dc684cdef8fe259
Author: Jia Fan <[email protected]>
AuthorDate: Wed Oct 25 18:10:07 2023 +0800
[Fix] Fix log error when multi-table sink close (#5683)
---
.../seatunnel/assertion/sink/AssertSink.java | 4 +-
.../seatunnel/assertion/sink/AssertSinkWriter.java | 4 +-
.../multitablesink/MultiTableSinkWriter.java | 17 +++++
.../multitablesink/MultiTableWriterRunnable.java | 5 ++
.../e2e/connector/fake/FakeWithMultiTableTT.java | 12 +++
.../fake_to_assert_with_multitable_exception.conf | 85 ++++++++++++++++++++++
.../connectors/seatunnel/jdbc/JdbcPostgresIT.java | 8 +-
.../e2e/common/container/TestContainer.java | 2 +
.../flink/AbstractTestFlinkContainer.java | 5 ++
.../container/seatunnel/SeaTunnelContainer.java | 5 ++
.../spark/AbstractTestSparkContainer.java | 5 ++
11 files changed, 143 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index 97106bea02..d65398c9de 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -50,7 +51,8 @@ import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertCon
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
@AutoService(SeaTunnelSink.class)
-public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
private SeaTunnelRowType seaTunnelRowType;
private CatalogTable catalogTable;
private List<AssertFieldRule> assertFieldRules;
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
index c0bc0d9718..7d7968ad00 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkWriter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.assertion.sink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.assertion.excecutor.AssertExecutor;
@@ -33,7 +34,8 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.LongAccumulator;
-public class AssertSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class AssertSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+ implements SupportMultiTableSinkWriter<Void> {
private final SeaTunnelRowType seaTunnelRowType;
private final List<AssertFieldRule> assertFieldRules;
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
index 765f58785c..fad9c2ffa9 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
@@ -156,6 +156,7 @@ public class MultiTableSinkWriter
@Override
public List<MultiTableState> snapshotState(long checkpointId) throws
IOException {
+ checkQueueRemain();
subSinkErrorCheck();
List<MultiTableState> multiTableStates = new ArrayList<>();
MultiTableState multiTableState = new MultiTableState(new HashMap<>());
@@ -174,6 +175,7 @@ public class MultiTableSinkWriter
@Override
public Optional<MultiTableCommitInfo> prepareCommit() throws IOException {
+ checkQueueRemain();
subSinkErrorCheck();
MultiTableCommitInfo multiTableCommitInfo = new
MultiTableCommitInfo(new HashMap<>());
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
@@ -194,6 +196,7 @@ public class MultiTableSinkWriter
@Override
public void abortPrepare() {
+ checkQueueRemain();
Throwable firstE = null;
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
synchronized (runnable.get(i)) {
@@ -217,6 +220,7 @@ public class MultiTableSinkWriter
@Override
public void close() throws IOException {
+ checkQueueRemain();
executorService.shutdownNow();
Throwable firstE = null;
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
@@ -243,4 +247,17 @@ public class MultiTableSinkWriter
throw new RuntimeException(firstE);
}
}
+
+ private void checkQueueRemain() {
+ try {
+ for (BlockingQueue<SeaTunnelRow> blockingQueue : blockingQueues) {
+ while (!blockingQueue.isEmpty()) {
+ Thread.sleep(100);
+ subSinkErrorCheck();
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
index f5895c374e..1fa681f0fa 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
@@ -61,6 +61,11 @@ public class MultiTableWriterRunnable implements Runnable {
synchronized (this) {
writer.write(row);
}
+ } catch (InterruptedException e) {
+ // When the job finished, the thread will be interrupted, so
we ignore this
+ // exception.
+ throwable = e;
+ break;
} catch (Exception e) {
log.error("MultiTableWriterRunnable error", e);
throwable = e;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithMultiTableTT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithMultiTableTT.java
index 1f41385b5a..2825f9d7da 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithMultiTableTT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeWithMultiTableTT.java
@@ -38,6 +38,18 @@ public class FakeWithMultiTableTT extends TestSuiteBase {
throws IOException, InterruptedException {
Container.ExecResult fakeWithTableNames =
container.executeJob("/fake_to_console_with_multitable_mode.conf");
+ Assertions.assertFalse(
+ container.getServerLogs().contains("MultiTableWriterRunnable
error"));
Assertions.assertEquals(0, fakeWithTableNames.getExitCode());
+
+ Container.ExecResult fakeWithException =
+
container.executeJob("/fake_to_assert_with_multitable_exception.conf");
+
Assertions.assertTrue(container.getServerLogs().contains("MultiTableWriterRunnable
error"));
+ Assertions.assertTrue(
+ container
+ .getServerLogs()
+ .contains(
+ "at
org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.checkQueueRemain(MultiTableSinkWriter.java"));
+ Assertions.assertEquals(1, fakeWithException.getExitCode());
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_multitable_exception.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_multitable_exception.conf
new file mode 100644
index 0000000000..d4af80d024
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_with_multitable_exception.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 200]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ field_rules = [
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = MAX
+ rule_value = 100
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 075e6780e4..bf2046ed26 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -50,8 +50,6 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -255,11 +253,7 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
for (String CONFIG_FILE : PG_CONFIG_FILE_LIST) {
Container.ExecResult execResult =
container.executeJob(CONFIG_FILE);
Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertTimeout(
- Duration.of(10, ChronoUnit.SECONDS),
- () ->
- Assertions.assertIterableEquals(
- querySql(SOURCE_SQL), querySql(SINK_SQL)));
+ Assertions.assertIterableEquals(querySql(SOURCE_SQL),
querySql(SINK_SQL));
executeSQL("truncate table pg_e2e_sink_table");
log.info(CONFIG_FILE + " e2e test completed");
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index fbe97b6d86..09924ef3f9 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -34,4 +34,6 @@ public interface TestContainer extends TestResource {
throws IOException, InterruptedException;
Container.ExecResult executeJob(String confFile) throws IOException,
InterruptedException;
+
+ String getServerLogs();
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index e62e5ab338..a630f243a1 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -139,4 +139,9 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
log.info("test in container: {}", identifier());
return executeJob(jobManager, confFile);
}
+
+ @Override
+ public String getServerLogs() {
+ return jobManager.getLogs();
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 8d1de95654..1909518838 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -143,4 +143,9 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
log.info("test in container: {}", identifier());
return executeJob(server, confFile);
}
+
+ @Override
+ public String getServerLogs() {
+ return server.getLogs();
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index 88155f47fd..7d0a416dcc 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -96,4 +96,9 @@ public abstract class AbstractTestSparkContainer extends
AbstractTestContainer {
log.info("test in container: {}", identifier());
return executeJob(master, confFile);
}
+
+ @Override
+ public String getServerLogs() {
+ return master.getLogs();
+ }
}