This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b7002bfaf4 [Feature][Connector-V2] Add web3j source connector (#6598)
b7002bfaf4 is described below
commit b7002bfaf4add341ad9ec5dd148c2364b413674e
Author: ic4y <[email protected]>
AuthorDate: Sat May 11 20:19:15 2024 +0800
[Feature][Connector-V2] Add web3j source connector (#6598)
---------
Co-authored-by: Jia Fan <[email protected]>
---
docs/en/connector-v2/source/Web3j.md | 61 ++++++++++++++++
plugin-mapping.properties | 1 +
release-note.md | 1 +
seatunnel-connectors-v2/connector-web3j/pom.xml | 54 ++++++++++++++
.../connectors/seatunnel/config/Web3jConfig.java | 31 ++++++++
.../connectors/seatunnel/source/Web3jSource.java | 85 ++++++++++++++++++++++
.../seatunnel/source/Web3jSourceFactory.java | 56 ++++++++++++++
.../seatunnel/source/Web3jSourceParameter.java | 36 +++++++++
.../seatunnel/source/Web3jSourceReader.java | 83 +++++++++++++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 ++
.../connector-web3j-e2e/pom.xml | 49 +++++++++++++
.../Web3jIT.java | 45 ++++++++++++
.../test/resources/firestore/web3j_to_assert.conf | 56 ++++++++++++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
15 files changed, 566 insertions(+)
diff --git a/docs/en/connector-v2/source/Web3j.md
b/docs/en/connector-v2/source/Web3j.md
new file mode 100644
index 0000000000..6e50789b41
--- /dev/null
+++ b/docs/en/connector-v2/source/Web3j.md
@@ -0,0 +1,61 @@
+# Web3j
+
+> Web3j source connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Description
+
+Source connector for web3j. It is used to read data from the blockchain, such
as block information, transactions, smart contract events, etc. Currently, it
supports reading block height data.
+
+## Source Options
+
+| Name | Type | Required | Default |
Description |
+|------|--------|----------|---------|---------------------------------------------------------------------------------------------------------|
+| url | String | Yes | - | When using Infura as the service
provider, the URL is used for communication with the Ethereum network. |
+
+## How to Create a Http Data Synchronization Jobs
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Web3j {
+ url = "https://mainnet.infura.io/v3/xxxxx"
+ }
+}
+
+# Console printing of the read Http data
+sink {
+ Console {
+ parallelism = 1
+ }
+}
+```
+
+Then you will get the following data:
+
+```json
+{"blockNumber":19525949,"timestamp":"2024-03-27T13:28:45.605Z"}
+```
+
+## Changelog
+
+- Add Web3j Source Connector
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 3ea8bfc7f7..c880a8fdf2 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -105,6 +105,7 @@ seatunnel.sink.Maxcompute = connector-maxcompute
seatunnel.source.MySQL-CDC = connector-cdc-mysql
seatunnel.source.MongoDB-CDC = connector-cdc-mongodb
seatunnel.sink.S3Redshift = connector-s3-redshift
+seatunnel.source.Web3j = connector-web3j
seatunnel.source.TDengine = connector-tdengine
seatunnel.sink.TDengine = connector-tdengine
seatunnel.source.Persistiq = connector-http-persistiq
diff --git a/release-note.md b/release-note.md
index 9b41cc96d6..b799df78f7 100644
--- a/release-note.md
+++ b/release-note.md
@@ -18,6 +18,7 @@
- [Hbase] Add hbase sink connector #4049
- [Clickhouse] Fix clickhouse old version compatibility #5326
- [Easysearch] Support INFINI Easysearch #5933
+- [Web3j] add Web3j source connector #6598
### Formats
- [Canal]Support read canal format message #3950
- [Debezium]Support debezium canal format message #3981
diff --git a/seatunnel-connectors-v2/connector-web3j/pom.xml
b/seatunnel-connectors-v2/connector-web3j/pom.xml
new file mode 100644
index 0000000000..5226f51c2a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-web3j/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-web3j</artifactId>
+ <name>SeaTunnel : Connectors V2 : Web3j</name>
+
+ <properties>
+ <web3j.version>4.8.4</web3j.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.web3j</groupId>
+ <artifactId>core</artifactId>
+ <version>${web3j.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/Web3jConfig.java
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/Web3jConfig.java
new file mode 100644
index 0000000000..0147d5bf0c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/Web3jConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class Web3jConfig {
+
+ public static final Option<String> URL =
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "your infura project url like :
https://mainnet.infura.io/v3/xxxxxxxxxxxx");
+}
diff --git
a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSource.java
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSource.java
new file mode 100644
index 0000000000..da18409386
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+public class Web3jSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+ private Web3jSourceParameter parameter;
+ private JobContext jobContext;
+
+ public Web3jSource(ReadonlyConfig readonlyConfig) {
+ this.parameter = new Web3jSourceParameter(readonlyConfig);
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return JobMode.BATCH.equals(jobContext.getJobMode())
+ ? Boundedness.BOUNDED
+ : Boundedness.UNBOUNDED;
+ }
+
+ @Override
+ public String getPluginName() {
+ return "Web3j";
+ }
+
+ @Override
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ }
+
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(
+ CatalogTable.of(
+ TableIdentifier.of("Web3j", TablePath.DEFAULT),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "value",
BasicType.STRING_TYPE, 0L, true, null, ""))
+ .build(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ ""));
+ }
+
+ @Override
+ public AbstractSingleSplitReader<SeaTunnelRow> createReader(
+ SingleSplitReaderContext readerContext) throws Exception {
+ return new Web3jSourceReader(this.parameter, readerContext);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceFactory.java
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceFactory.java
new file mode 100644
index 0000000000..89c53fc89d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+import static org.apache.seatunnel.connectors.seatunnel.config.Web3jConfig.URL;
+
+@AutoService(Factory.class)
+public class Web3jSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Web3j";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(URL).build();
+ }
+
+ @Override
+ public Class<? extends SeaTunnelSource> getSourceClass() {
+ return Web3jSource.class;
+ }
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
Web3jSource(context.getOptions());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceParameter.java
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceParameter.java
new file mode 100644
index 0000000000..77aefd6dac
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceParameter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import java.io.Serializable;
+
+import static org.apache.seatunnel.connectors.seatunnel.config.Web3jConfig.URL;
+
+public class Web3jSourceParameter implements Serializable {
+ private final String url;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public Web3jSourceParameter(ReadonlyConfig config) {
+ this.url = config.get(URL);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceReader.java
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceReader.java
new file mode 100644
index 0000000000..52b9507f91
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceReader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import org.web3j.protocol.Web3j;
+import org.web3j.protocol.http.HttpService;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class Web3jSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>
{
+ private final Web3jSourceParameter parameter;
+ private final SingleSplitReaderContext context;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Web3j web3;
+
+ Web3jSourceReader(Web3jSourceParameter parameter, SingleSplitReaderContext
context) {
+ this.parameter = parameter;
+ this.context = context;
+ }
+
+ @Override
+ public void open() throws Exception {
+ web3 = Web3j.build(new HttpService(this.parameter.getUrl()));
+ log.info("connect Web3j server, url:[{}] ", this.parameter.getUrl());
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (web3 != null) {
+ web3.shutdown();
+ }
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ web3.ethBlockNumber()
+ .flowable()
+ .subscribe(
+ blockNumber -> {
+ Map<String, Object> data = new HashMap<>();
+ data.put("timestamp", Instant.now().toString());
+ data.put("blockNumber",
blockNumber.getBlockNumber());
+
+ String json =
OBJECT_MAPPER.writeValueAsString(data);
+
+ output.collect(new SeaTunnelRow(new Object[]
{json}));
+
+ if
(Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached
the end of the data.
+ context.signalNoMoreElement();
+ }
+ });
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index f69b592087..d1e5af9ee6 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -75,6 +75,7 @@
<module>connector-amazonsqs</module>
<module>connector-paimon</module>
<module>connector-easysearch</module>
+ <module>connector-web3j</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 6447ff43ac..41cb15cd63 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -311,6 +311,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-web3j</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-kudu</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/pom.xml
new file mode 100644
index 0000000000..913e2e4a42
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-web3j-e2e</artifactId>
+ <name>SeaTunnel : E2E : Connector V2 : Web3j</name>
+
+ <dependencies>
+ <!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-web3j</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-console</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/Web3jIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/Web3jIT.java
new file mode 100644
index 0000000000..040f0f23c4
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/Web3jIT.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.google.firestore;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+@Disabled("Disabled because it needs your infura project url to run this test")
+public class Web3jIT extends TestSuiteBase implements TestResource {
+
+ private static final String FIRESTORE_CONF_FILE =
"/firestore/web3j_to_assert.conf";
+
+ @TestTemplate
+ public void testWeb3j(TestContainer container) throws Exception {
+ Container.ExecResult execResult =
container.executeJob(FIRESTORE_CONF_FILE);
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @Override
+ public void startUp() throws Exception {}
+
+ @Override
+ public void tearDown() throws Exception {}
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/resources/firestore/web3j_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/resources/firestore/web3j_to_assert.conf
new file mode 100644
index 0000000000..95d2a2b17a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/resources/firestore/web3j_to_assert.conf
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Web3j {
+ url = "https://mainnet.infura.io/v3/xxxxxxx"
+ result_table_name = "web3j"
+ }
+}
+
+sink {
+ # This is a example sink plugin **only for test and demonstrate the
feature sink plugin**
+ Console {
+ source_table_name = "web3j"
+
+ }
+ Assert {
+ source_table_name = "web3j"
+ rules {
+ field_rules = [
+ {
+ field_name = value
+ field_type = String
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 45c78dbf70..477b0620d2 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -57,6 +57,7 @@
<module>connector-datahub-e2e</module>
<module>connector-mongodb-e2e</module>
<module>connector-hbase-e2e</module>
+ <module>connector-web3j-e2e</module>
<module>connector-maxcompute-e2e</module>
<module>connector-google-firestore-e2e</module>
<module>connector-rocketmq-e2e</module>