This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b7002bfaf4 [Feature][Connector-V2] Add web3j source connector (#6598)
b7002bfaf4 is described below

commit b7002bfaf4add341ad9ec5dd148c2364b413674e
Author: ic4y <[email protected]>
AuthorDate: Sat May 11 20:19:15 2024 +0800

    [Feature][Connector-V2] Add web3j source connector (#6598)
    
    
    
    ---------
    
    Co-authored-by: Jia Fan <[email protected]>
---
 docs/en/connector-v2/source/Web3j.md               | 61 ++++++++++++++++
 plugin-mapping.properties                          |  1 +
 release-note.md                                    |  1 +
 seatunnel-connectors-v2/connector-web3j/pom.xml    | 54 ++++++++++++++
 .../connectors/seatunnel/config/Web3jConfig.java   | 31 ++++++++
 .../connectors/seatunnel/source/Web3jSource.java   | 85 ++++++++++++++++++++++
 .../seatunnel/source/Web3jSourceFactory.java       | 56 ++++++++++++++
 .../seatunnel/source/Web3jSourceParameter.java     | 36 +++++++++
 .../seatunnel/source/Web3jSourceReader.java        | 83 +++++++++++++++++++++
 seatunnel-connectors-v2/pom.xml                    |  1 +
 seatunnel-dist/pom.xml                             |  6 ++
 .../connector-web3j-e2e/pom.xml                    | 49 +++++++++++++
 .../Web3jIT.java                                   | 45 ++++++++++++
 .../test/resources/firestore/web3j_to_assert.conf  | 56 ++++++++++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |  1 +
 15 files changed, 566 insertions(+)

diff --git a/docs/en/connector-v2/source/Web3j.md 
b/docs/en/connector-v2/source/Web3j.md
new file mode 100644
index 0000000000..6e50789b41
--- /dev/null
+++ b/docs/en/connector-v2/source/Web3j.md
@@ -0,0 +1,61 @@
+# Web3j
+
+> Web3j source connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Description
+
+Source connector for web3j. It is used to read data from the blockchain, such 
as block information, transactions, smart contract events, etc.  Currently, it 
supports reading block height data.
+
+## Source Options
+
+| Name |  Type  | Required | Default |                                         
      Description                                               |
+|------|--------|----------|---------|---------------------------------------------------------------------------------------------------------|
+| url  | String | Yes      | -       | When using Infura as the service 
provider, the URL is used for communication with the Ethereum network. |
+
+## How to Create a Http Data Synchronization Jobs
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Web3j {
+    url = "https://mainnet.infura.io/v3/xxxxx";
+  }
+}
+
+# Console printing of the read Http data
+sink {
+  Console {
+    parallelism = 1
+  }
+}
+```
+
+Then you will get the following data:
+
+```json
+{"blockNumber":19525949,"timestamp":"2024-03-27T13:28:45.605Z"}
+```
+
+## Changelog
+
+- Add Web3j Source Connector
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 3ea8bfc7f7..c880a8fdf2 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -105,6 +105,7 @@ seatunnel.sink.Maxcompute = connector-maxcompute
 seatunnel.source.MySQL-CDC = connector-cdc-mysql
 seatunnel.source.MongoDB-CDC = connector-cdc-mongodb
 seatunnel.sink.S3Redshift = connector-s3-redshift
+seatunnel.source.Web3j = connector-web3j
 seatunnel.source.TDengine = connector-tdengine
 seatunnel.sink.TDengine = connector-tdengine
 seatunnel.source.Persistiq = connector-http-persistiq
diff --git a/release-note.md b/release-note.md
index 9b41cc96d6..b799df78f7 100644
--- a/release-note.md
+++ b/release-note.md
@@ -18,6 +18,7 @@
 - [Hbase] Add hbase sink connector #4049
 - [Clickhouse] Fix clickhouse old version compatibility #5326
 - [Easysearch] Support INFINI Easysearch #5933
+- [Web3j] add Web3j source connector #6598
 ### Formats
 - [Canal]Support read canal format message #3950
 - [Debezium]Support debezium canal format message #3981
diff --git a/seatunnel-connectors-v2/connector-web3j/pom.xml 
b/seatunnel-connectors-v2/connector-web3j/pom.xml
new file mode 100644
index 0000000000..5226f51c2a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-web3j/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-web3j</artifactId>
+    <name>SeaTunnel : Connectors V2 : Web3j</name>
+
+    <properties>
+        <web3j.version>4.8.4</web3j.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.web3j</groupId>
+            <artifactId>core</artifactId>
+            <version>${web3j.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/Web3jConfig.java
 
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/Web3jConfig.java
new file mode 100644
index 0000000000..0147d5bf0c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/config/Web3jConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class Web3jConfig {
+
+    public static final Option<String> URL =
+            Options.key("url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "your infura project url like : 
https://mainnet.infura.io/v3/xxxxxxxxxxxx";);
+}
diff --git 
a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSource.java
 
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSource.java
new file mode 100644
index 0000000000..da18409386
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+public class Web3jSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+    private Web3jSourceParameter parameter;
+    private JobContext jobContext;
+
+    public Web3jSource(ReadonlyConfig readonlyConfig) {
+        this.parameter = new Web3jSourceParameter(readonlyConfig);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return JobMode.BATCH.equals(jobContext.getJobMode())
+                ? Boundedness.BOUNDED
+                : Boundedness.UNBOUNDED;
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Web3j";
+    }
+
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+    }
+
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(
+                CatalogTable.of(
+                        TableIdentifier.of("Web3j", TablePath.DEFAULT),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.of(
+                                                "value", 
BasicType.STRING_TYPE, 0L, true, null, ""))
+                                .build(),
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        ""));
+    }
+
+    @Override
+    public AbstractSingleSplitReader<SeaTunnelRow> createReader(
+            SingleSplitReaderContext readerContext) throws Exception {
+        return new Web3jSourceReader(this.parameter, readerContext);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceFactory.java
 
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceFactory.java
new file mode 100644
index 0000000000..89c53fc89d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+import static org.apache.seatunnel.connectors.seatunnel.config.Web3jConfig.URL;
+
+@AutoService(Factory.class)
+public class Web3jSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Web3j";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(URL).build();
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return Web3jSource.class;
+    }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
Web3jSource(context.getOptions());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceParameter.java
 
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceParameter.java
new file mode 100644
index 0000000000..77aefd6dac
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceParameter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import java.io.Serializable;
+
+import static org.apache.seatunnel.connectors.seatunnel.config.Web3jConfig.URL;
+
+public class Web3jSourceParameter implements Serializable {
+    private final String url;
+
+    public String getUrl() {
+        return url;
+    }
+
+    public Web3jSourceParameter(ReadonlyConfig config) {
+        this.url = config.get(URL);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceReader.java
 
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceReader.java
new file mode 100644
index 0000000000..52b9507f91
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-web3j/src/main/java/org/apache/seatunnel/connectors/seatunnel/source/Web3jSourceReader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.source;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import org.web3j.protocol.Web3j;
+import org.web3j.protocol.http.HttpService;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class Web3jSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> 
{
+    private final Web3jSourceParameter parameter;
+    private final SingleSplitReaderContext context;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private Web3j web3;
+
+    Web3jSourceReader(Web3jSourceParameter parameter, SingleSplitReaderContext 
context) {
+        this.parameter = parameter;
+        this.context = context;
+    }
+
+    @Override
+    public void open() throws Exception {
+        web3 = Web3j.build(new HttpService(this.parameter.getUrl()));
+        log.info("connect Web3j server, url:[{}] ", this.parameter.getUrl());
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (web3 != null) {
+            web3.shutdown();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        web3.ethBlockNumber()
+                .flowable()
+                .subscribe(
+                        blockNumber -> {
+                            Map<String, Object> data = new HashMap<>();
+                            data.put("timestamp", Instant.now().toString());
+                            data.put("blockNumber", 
blockNumber.getBlockNumber());
+
+                            String json = 
OBJECT_MAPPER.writeValueAsString(data);
+
+                            output.collect(new SeaTunnelRow(new Object[] 
{json}));
+
+                            if 
(Boundedness.BOUNDED.equals(context.getBoundedness())) {
+                                // signal to the source that we have reached 
the end of the data.
+                                context.signalNoMoreElement();
+                            }
+                        });
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index f69b592087..d1e5af9ee6 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -75,6 +75,7 @@
         <module>connector-amazonsqs</module>
         <module>connector-paimon</module>
         <module>connector-easysearch</module>
+        <module>connector-web3j</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 6447ff43ac..41cb15cd63 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -311,6 +311,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-web3j</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>
                     <artifactId>connector-kudu</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/pom.xml
new file mode 100644
index 0000000000..913e2e4a42
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-web3j-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Web3j</name>
+
+    <dependencies>
+        <!-- SeaTunnel connectors -->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-web3j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/Web3jIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/Web3jIT.java
new file mode 100644
index 0000000000..040f0f23c4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/Web3jIT.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.google.firestore;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+@Disabled("Disabled because it needs your infura project url to run this test")
+public class Web3jIT extends TestSuiteBase implements TestResource {
+
+    private static final String FIRESTORE_CONF_FILE = 
"/firestore/web3j_to_assert.conf";
+
+    @TestTemplate
+    public void testWeb3j(TestContainer container) throws Exception {
+        Container.ExecResult execResult = 
container.executeJob(FIRESTORE_CONF_FILE);
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @Override
+    public void startUp() throws Exception {}
+
+    @Override
+    public void tearDown() throws Exception {}
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/resources/firestore/web3j_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/resources/firestore/web3j_to_assert.conf
new file mode 100644
index 0000000000..95d2a2b17a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-web3j-e2e/src/test/resources/firestore/web3j_to_assert.conf
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  Web3j {
+   url = "https://mainnet.infura.io/v3/xxxxxxx";
+   result_table_name = "web3j"
+  }
+}
+
+sink {
+    # This is a example sink plugin **only for test and demonstrate the 
feature sink plugin**
+    Console {
+    source_table_name = "web3j"
+
+    }
+  Assert {
+    source_table_name = "web3j"
+    rules {
+      field_rules = [
+        {
+          field_name = value
+          field_type = String
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 45c78dbf70..477b0620d2 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -57,6 +57,7 @@
         <module>connector-datahub-e2e</module>
         <module>connector-mongodb-e2e</module>
         <module>connector-hbase-e2e</module>
+        <module>connector-web3j-e2e</module>
         <module>connector-maxcompute-e2e</module>
         <module>connector-google-firestore-e2e</module>
         <module>connector-rocketmq-e2e</module>

Reply via email to