This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c880b7aa4d [Improve][Connector-V2]Support multi-table sink feature for
email (#7368)
c880b7aa4d is described below
commit c880b7aa4dd6198d3b6efa1ef0780a2ce9b5ff4d
Author: corgy-w <[email protected]>
AuthorDate: Tue Aug 20 10:36:13 2024 +0800
[Improve][Connector-V2]Support multi-table sink feature for email (#7368)
---
docs/en/connector-v2/sink/Email.md | 33 +++--
docs/zh/connector-v2/sink/Email.md | 31 +++--
.../seatunnel/email/config/EmailConfig.java | 12 +-
.../seatunnel/email/config/EmailSinkConfig.java | 46 +++----
.../connectors/seatunnel/email/sink/EmailSink.java | 38 +++---
.../seatunnel/email/sink/EmailSinkFactory.java | 11 ++
.../seatunnel/email/sink/EmailSinkWriter.java | 58 ++++----
.../connector-email-e2e/pom.xml | 37 ++++++
.../e2e/connector/email/EmailWithMultiIT.java | 146 +++++++++++++++++++++
.../src/test/resources/fake_to_email.conf | 63 +++++++++
.../src/test/resources/fake_to_email_test.conf | 62 +++++++++
.../src/test/resources/fake_to_multiemailsink.conf | 83 ++++++++++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
13 files changed, 520 insertions(+), 101 deletions(-)
diff --git a/docs/en/connector-v2/sink/Email.md
b/docs/en/connector-v2/sink/Email.md
index f2bca2783d..444be57292 100644
--- a/docs/en/connector-v2/sink/Email.md
+++ b/docs/en/connector-v2/sink/Email.md
@@ -14,25 +14,26 @@ The tested email version is 1.5.6.
## Options
-| name | type | required | default value |
-|--------------------------|--------|----------|---------------|
-| email_from_address | string | yes | - |
-| email_to_address | string | yes | - |
-| email_host | string | yes | - |
-| email_transport_protocol | string | yes | - |
-| email_smtp_auth | string | yes | - |
-| email_authorization_code | string | yes | - |
-| email_message_headline | string | yes | - |
-| email_message_content | string | yes | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|--------------------------|---------|----------|---------------|
+| email_from_address | string | yes | - |
+| email_to_address | string | yes | - |
+| email_host | string | yes | - |
+| email_transport_protocol | string | yes | - |
+| email_smtp_auth | boolean | yes | - |
+| email_smtp_port | int | no | 465 |
+| email_authorization_code | string | no | - |
+| email_message_headline | string | yes | - |
+| email_message_content | string | yes | - |
+| common-options | | no | - |
### email_from_address [string]
-Sender Email Address .
+Sender Email Address.
### email_to_address [string]
-Address to receive mail.
+Address to receive mail, Support multiple email addresses, separated by commas
(,).
### email_host [string]
@@ -42,10 +43,14 @@ SMTP server to connect to.
The protocol to load the session .
-### email_smtp_auth [string]
+### email_smtp_auth [boolean]
Whether to authenticate the customer.
+### email_smtp_port [int]
+
+Select port for authentication.
+
### email_authorization_code [string]
authorization code,You can obtain the authorization code from the mailbox
Settings.
diff --git a/docs/zh/connector-v2/sink/Email.md
b/docs/zh/connector-v2/sink/Email.md
index cc3999c580..a254dc4608 100644
--- a/docs/zh/connector-v2/sink/Email.md
+++ b/docs/zh/connector-v2/sink/Email.md
@@ -16,17 +16,18 @@
## 选项
-| 名称 | 类型 | 是否必须 | 默认值 |
-|--------------------------|--------|------|-----|
-| email_from_address | string | 是 | - |
-| email_to_address | string | 是 | - |
-| email_host | string | 是 | - |
-| email_transport_protocol | string | 是 | - |
-| email_smtp_auth | string | 是 | - |
-| email_authorization_code | string | 是 | - |
-| email_message_headline | string | 是 | - |
-| email_message_content | string | 是 | - |
-| common-options | | 否 | - |
+| 名称 | 类型 | 是否必须 | 默认值 |
+|--------------------------|---------|------|-----|
+| email_from_address | string | 是 | - |
+| email_to_address | string | 是 | - |
+| email_host | string | 是 | - |
+| email_transport_protocol | string | 是 | - |
+| email_smtp_auth | boolean | 是 | - |
+| email_smtp_port | int | 否 | 465 |
+| email_authorization_code | string | 否 | - |
+| email_message_headline | string | 是 | - |
+| email_message_content | string | 是 | - |
+| common-options | | 否 | - |
### email_from_address [string]
@@ -34,7 +35,7 @@
### email_to_address [string]
-接收邮件的地址
+接收邮件的地址,支持多个邮箱地址,以逗号(,)分隔。
### email_host [string]
@@ -44,10 +45,14 @@
加载会话的协议
-### email_smtp_auth [string]
+### email_smtp_auth [boolean]
是否对客户进行认证
+### email_smtp_port [int]
+
+选择用于身份验证的端口。
+
### email_authorization_code [string]
授权码,您可以从邮箱设置中获取授权码
diff --git
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java
index 03825804d8..406de844df 100644
---
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java
+++
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailConfig.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.configuration.Options;
public class EmailConfig {
+ public static final String CONNECTOR_IDENTITY = "EmailSink";
+
public static final Option<String> EMAIL_FROM_ADDRESS =
Options.key("email_from_address")
.stringType()
@@ -61,9 +63,15 @@ public class EmailConfig {
.stringType()
.noDefaultValue()
.withDescription("The protocol used to send the message");
- public static final Option<String> EMAIL_SMTP_AUTH =
+ public static final Option<Boolean> EMAIL_SMTP_AUTH =
Options.key("email_smtp_auth")
- .stringType()
+ .booleanType()
.noDefaultValue()
.withDescription("Whether to use SMTP authentication");
+
+ public static final Option<Integer> EMAIL_SMTP_PORT =
+ Options.key("email_smtp_port")
+ .intType()
+ .defaultValue(465)
+ .withDescription("Select port for authentication.");
}
diff --git
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
index f876fab37c..8455a2ed2c 100644
---
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java
@@ -17,22 +17,25 @@
package org.apache.seatunnel.connectors.seatunnel.email.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Data;
import lombok.NonNull;
+import java.io.Serializable;
+
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_AUTHORIZATION_CODE;
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_FROM_ADDRESS;
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_HOST;
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_MESSAGE_CONTENT;
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_MESSAGE_HEADLINE;
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_SMTP_AUTH;
+import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_SMTP_PORT;
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_TO_ADDRESS;
import static
org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig.EMAIL_TRANSPORT_PROTOCOL;
@Data
-public class EmailSinkConfig {
+public class EmailSinkConfig implements Serializable {
private String emailFromAddress;
private String emailToAddress;
private String emailAuthorizationCode;
@@ -40,32 +43,19 @@ public class EmailSinkConfig {
private String emailMessageContent;
private String emailHost;
private String emailTransportProtocol;
- private String emailSmtpAuth;
+ private Boolean emailSmtpAuth;
+ private Integer emailSmtpPort;
- public EmailSinkConfig(@NonNull Config pluginConfig) {
- if (pluginConfig.hasPath(EMAIL_FROM_ADDRESS.key())) {
- this.emailFromAddress =
pluginConfig.getString(EMAIL_FROM_ADDRESS.key());
- }
- if (pluginConfig.hasPath(EMAIL_TO_ADDRESS.key())) {
- this.emailToAddress =
pluginConfig.getString(EMAIL_TO_ADDRESS.key());
- }
- if (pluginConfig.hasPath(EMAIL_AUTHORIZATION_CODE.key())) {
- this.emailAuthorizationCode =
pluginConfig.getString(EMAIL_AUTHORIZATION_CODE.key());
- }
- if (pluginConfig.hasPath(EMAIL_MESSAGE_HEADLINE.key())) {
- this.emailMessageHeadline =
pluginConfig.getString(EMAIL_MESSAGE_HEADLINE.key());
- }
- if (pluginConfig.hasPath(EMAIL_MESSAGE_CONTENT.key())) {
- this.emailMessageContent =
pluginConfig.getString(EMAIL_MESSAGE_CONTENT.key());
- }
- if (pluginConfig.hasPath(EMAIL_HOST.key())) {
- this.emailHost = pluginConfig.getString(EMAIL_HOST.key());
- }
- if (pluginConfig.hasPath(EMAIL_TRANSPORT_PROTOCOL.key())) {
- this.emailTransportProtocol =
pluginConfig.getString(EMAIL_TRANSPORT_PROTOCOL.key());
- }
- if (pluginConfig.hasPath(EMAIL_SMTP_AUTH.key())) {
- this.emailSmtpAuth = pluginConfig.getString(EMAIL_SMTP_AUTH.key());
- }
+ public EmailSinkConfig(@NonNull ReadonlyConfig pluginConfig) {
+ super();
+ this.emailFromAddress = pluginConfig.get(EMAIL_FROM_ADDRESS);
+ this.emailToAddress = pluginConfig.get(EMAIL_TO_ADDRESS);
+ this.emailAuthorizationCode =
pluginConfig.get(EMAIL_AUTHORIZATION_CODE);
+ this.emailMessageHeadline = pluginConfig.get(EMAIL_MESSAGE_HEADLINE);
+ this.emailMessageContent = pluginConfig.get(EMAIL_MESSAGE_CONTENT);
+ this.emailHost = pluginConfig.get(EMAIL_HOST);
+ this.emailTransportProtocol =
pluginConfig.get(EMAIL_TRANSPORT_PROTOCOL);
+ this.emailSmtpAuth = pluginConfig.get(EMAIL_SMTP_AUTH);
+ this.emailSmtpPort = pluginConfig.get(EMAIL_SMTP_PORT);
}
}
diff --git
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
index c1b8ffdd37..0a3df90a12 100644
---
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
+++
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
@@ -17,40 +17,38 @@
package org.apache.seatunnel.connectors.seatunnel.email.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig;
+import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;
-@AutoService(SeaTunnelSink.class)
-public class EmailSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class EmailSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
- private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ private ReadonlyConfig readonlyConfig;
+ private CatalogTable catalogTable;
+ private EmailSinkConfig pluginConfig;
+
+ public EmailSink(ReadonlyConfig config, CatalogTable table) {
+ this.readonlyConfig = config;
+ this.catalogTable = table;
+ this.pluginConfig = new EmailSinkConfig(config);
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
}
@Override
- public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
+ public EmailSinkWriter createWriter(SinkWriter.Context context) {
return new EmailSinkWriter(seaTunnelRowType, pluginConfig);
}
@Override
public String getPluginName() {
- return "EmailSink";
- }
-
- @Override
- public void prepare(Config pluginConfig) {
- this.pluginConfig = pluginConfig;
+ return EmailConfig.CONNECTOR_IDENTITY;
}
}
diff --git
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java
index 243985ff6f..8db3e42cac 100644
---
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkFactory.java
@@ -18,8 +18,12 @@
package org.apache.seatunnel.connectors.seatunnel.email.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import com.google.auto.service.AutoService;
@@ -39,6 +43,12 @@ public class EmailSinkFactory implements TableSinkFactory {
return "EmailSink";
}
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new EmailSink(context.getOptions(), catalogTable);
+ }
+
@Override
public OptionRule optionRule() {
return OptionRule.builder()
@@ -51,6 +61,7 @@ public class EmailSinkFactory implements TableSinkFactory {
EMAIL_AUTHORIZATION_CODE,
EMAIL_MESSAGE_HEADLINE,
EMAIL_MESSAGE_CONTENT)
+ .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
index ebcb5f9041..f7fe04c068 100644
---
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.email.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
@@ -33,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
import javax.activation.DataHandler;
import javax.activation.DataSource;
import javax.activation.FileDataSource;
+import javax.mail.Address;
import javax.mail.Authenticator;
import javax.mail.BodyPart;
import javax.mail.Message;
@@ -51,15 +51,16 @@ import java.io.IOException;
import java.util.Properties;
@Slf4j
-public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+ implements SupportMultiTableSinkWriter<Void> {
private final SeaTunnelRowType seaTunnelRowType;
- private EmailSinkConfig config;
+ private final EmailSinkConfig config;
private StringBuffer stringBuffer;
- public EmailSinkWriter(SeaTunnelRowType seaTunnelRowType, Config
pluginConfig) {
+ public EmailSinkWriter(SeaTunnelRowType seaTunnelRowType, EmailSinkConfig
pluginConfig) {
this.seaTunnelRowType = seaTunnelRowType;
- this.config = new EmailSinkConfig(pluginConfig);
+ this.config = pluginConfig;
this.stringBuffer = new StringBuffer();
}
@@ -78,29 +79,32 @@ public class EmailSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
public void close() {
createFile();
Properties properties = new Properties();
-
properties.setProperty("mail.host", config.getEmailHost());
-
properties.setProperty("mail.transport.protocol",
config.getEmailTransportProtocol());
-
- properties.setProperty("mail.smtp.auth", config.getEmailSmtpAuth());
+ properties.setProperty("mail.smtp.auth",
config.getEmailSmtpAuth().toString());
+ properties.setProperty("mail.smtp.port",
config.getEmailSmtpPort().toString());
try {
MailSSLSocketFactory sf = new MailSSLSocketFactory();
sf.setTrustAllHosts(true);
- properties.put("mail.smtp.ssl.enable", "true");
properties.put("mail.smtp.ssl.socketFactory", sf);
- Session session =
- Session.getDefaultInstance(
- properties,
- new Authenticator() {
- @Override
- protected PasswordAuthentication
getPasswordAuthentication() {
- return new PasswordAuthentication(
- config.getEmailFromAddress(),
-
config.getEmailAuthorizationCode());
- }
- });
+ Session session;
+ if (config.getEmailSmtpAuth()) {
+ properties.put("mail.smtp.ssl.enable", "true");
+ session =
+ Session.getDefaultInstance(
+ properties,
+ new Authenticator() {
+ @Override
+ protected PasswordAuthentication
getPasswordAuthentication() {
+ return new PasswordAuthentication(
+ config.getEmailFromAddress(),
+
config.getEmailAuthorizationCode());
+ }
+ });
+ } else {
+ session = Session.getDefaultInstance(properties);
+ }
// Create the default MimeMessage object
MimeMessage message = new MimeMessage(session);
@@ -108,8 +112,14 @@ public class EmailSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
message.setFrom(new InternetAddress(config.getEmailFromAddress()));
// Set the recipient email address
- message.addRecipient(
- Message.RecipientType.TO, new
InternetAddress(config.getEmailToAddress()));
+ String[] emailAddresses = config.getEmailToAddress().split(",");
+ Address[] addresses = new Address[emailAddresses.length];
+ for (int i = 0; i < emailAddresses.length; i++) {
+ addresses[i] = new InternetAddress(emailAddresses[i]);
+ }
+ if (addresses.length > 0) {
+ message.setRecipients(Message.RecipientType.TO, addresses);
+ }
// Setting the Email subject
message.setSubject(config.getEmailMessageHeadline());
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/pom.xml
new file mode 100644
index 0000000000..7a6552989a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-email-e2e</artifactId>
+ <name>SeaTunnel : E2E : Connector V2 : Email</name>
+
+ <dependencies>
+ <!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-email</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
new file mode 100644
index 0000000000..2822231874
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/java/org/apache/seatunnel/e2e/connector/email/EmailWithMultiIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.email;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.mail.Flags;
+import javax.mail.Folder;
+import javax.mail.Message;
+import javax.mail.Session;
+import javax.mail.Store;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+@Slf4j
+public class EmailWithMultiIT extends TestSuiteBase implements TestResource {
+ private static final String IMAGE = "greenmail/standalone";
+ private static final String HOST = "email-e2e";
+ private static final int STMP_PORT = 3025;
+ private static final int IMAP_PORT = 3143;
+
+ private GenericContainer<?> smtpContainer;
+
+ @BeforeAll
+ @Override
+ public void startUp() {
+ this.smtpContainer =
+ new GenericContainer<>(DockerImageName.parse(IMAGE))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withExposedPorts(STMP_PORT, IMAP_PORT)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(LoggerFactory.getLogger("email-service")));
+ Startables.deepStart(Stream.of(smtpContainer)).join();
+ log.info("SMTP container started");
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ if (smtpContainer != null) {
+ smtpContainer.stop();
+ }
+ }
+
+ @TestTemplate
+ public void testEmailSink(TestContainer container) throws Exception {
+ Container.ExecResult textWriteResult =
container.executeJob("/fake_to_email.conf");
+ testEMailSuccess(1, "[email protected]",
"[email protected]");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.FLINK},
+ disabledReason = "Currently FLINK do not support multi-table")
+ public void testMultipleTableEmailSink(TestContainer container) throws
Exception {
+ Container.ExecResult textWriteResult =
container.executeJob("/fake_to_multiemailsink.conf");
+ testEMailSuccess(2, "[email protected]",
"[email protected]");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ }
+
+ private Session setupImap() {
+ log.info("in setupImap");
+ Properties props = new Properties();
+ props.setProperty("mail.store.protocol", "imap");
+ props.put("mail.imap.host", smtpContainer.getHost());
+ props.put("mail.imap.port", smtpContainer.getMappedPort(IMAP_PORT));
+ props.put("mail.imap.localaddress", smtpContainer.getHost());
+ return Session.getInstance(props, null);
+ }
+
+ private void testEMailSuccess(int receivedNum, String... users) throws
Exception {
+ Session sessionIMAP = setupImap();
+ for (String user : users) {
+ Store store = sessionIMAP.getStore("imap");
+ store.connect(
+ smtpContainer.getHost(),
smtpContainer.getMappedPort(IMAP_PORT), user, "");
+ if (store.isConnected()) {
+ log.info("IMAP is connected");
+ Folder folder = store.getFolder("INBOX");
+ if (folder != null) {
+ // Open the folder in read/write mode
+ folder.open(Folder.READ_WRITE);
+
+ Message[] messages = folder.getMessages();
+ int unreadCount = 0;
+
+ for (Message message : messages) {
+ // Process only unread mail
+ if (!message.isSet(Flags.Flag.SEEN)) {
+ unreadCount++;
+ // Mark as read
+ message.setFlag(Flags.Flag.SEEN, true);
+ }
+ }
+
+ log.info("mail messages.length: {}", unreadCount);
+ Assertions.assertEquals(receivedNum, unreadCount);
+ }
+ } else {
+ log.info("IMAP is not connected");
+ }
+ }
+ }
+
+ @Disabled("Email authentication address and authentication information
need to be configured")
+ public void testOwnEmailSink(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult textReadResult =
container.executeJob("/fake_to_email_test.conf");
+ Assertions.assertEquals(0, textReadResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email.conf
new file mode 100644
index 0000000000..d69b83fd28
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ row.num = 100
+ schema = {
+ table = "test.table1"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ },
+ {
+ name = "age"
+ type = "int"
+ }
+ ]
+ }
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ EmailSink {
+ email_from_address = "[email protected]"
+ email_to_address = "[email protected],[email protected]"
+ email_host = "email-e2e"
+ email_transport_protocol = "smtp"
+ email_smtp_auth = "false"
+ email_smtp_port = 3025
+ email_authorization_code=""
+ email_message_headline = "test-title"
+ email_message_content = "test-content"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email_test.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email_test.conf
new file mode 100644
index 0000000000..b3657cd7a7
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_email_test.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ row.num = 100
+ schema = {
+ table = "test.table1"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ },
+ {
+ name = "age"
+ type = "int"
+ }
+ ]
+ }
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ EmailSink {
+ email_from_address = "[email protected]"
+ email_to_address = "[email protected]"
+ email_host="smtp.qq.com"
+ email_transport_protocol="smtp"
+ email_smtp_auth="true"
+ email_authorization_code="you authorization code"
+ email_message_headline="test-title"
+ email_message_content="test-content"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_multiemailsink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_multiemailsink.conf
new file mode 100644
index 0000000000..974ad1cbb2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-email-e2e/src/test/resources/fake_to_multiemailsink.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ row.num = 100
+ schema = {
+ table = "test.table1"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ },
+ {
+ name = "age"
+ type = "int"
+ }
+ ]
+ }
+ },
+ {
+ row.num = 100
+ schema = {
+ table = "test.table2"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "name"
+ type = "string"
+ },
+ {
+ name = "age"
+ type = "int"
+ }
+ ]
+ }
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ EmailSink {
+ email_from_address = "[email protected]"
+ email_to_address = "[email protected],[email protected]"
+ email_host = "email-e2e"
+ email_transport_protocol = "smtp"
+ email_smtp_auth = false
+ email_smtp_port = 3025
+ email_authorization_code=""
+ email_message_headline = "test-title"
+ email_message_content = "test-content"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index ed36310474..4933ab0205 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -77,6 +77,7 @@
<module>connector-milvus-e2e</module>
<module>connector-activemq-e2e</module>
<module>connector-sls-e2e</module>
+ <module>connector-email-e2e</module>
</modules>
<dependencies>