This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e6c51a95c7 [Improve][Connector-V2]Support multi-table sink feature for 
httpsink (#6316)
e6c51a95c7 is described below

commit e6c51a95c77542b051af4ac2caaf8cff67213666
Author: lizhenglei <[email protected]>
AuthorDate: Sun Feb 18 11:15:24 2024 +0800

    [Improve][Connector-V2]Support multi-table sink feature for httpsink (#6316)
---
 .../connectors/seatunnel/http/sink/HttpSink.java   |  4 +-
 .../seatunnel/http/sink/HttpSinkWriter.java        |  4 +-
 .../seatunnel/e2e/connector/http/HttpIT.java       | 16 ++++
 .../src/test/resources/fake_to_multitable.conf     | 88 ++++++++++++++++++++++
 .../src/test/resources/mockserver-config.json      | 24 ++++++
 5 files changed, 134 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 1cf22b0164..da1cb0a8da 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,7 +37,8 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class HttpSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class HttpSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+        implements SupportMultiTableSink {
     protected final HttpParameter httpParameter = new HttpParameter();
     protected SeaTunnelRowType seaTunnelRowType;
     protected Config pluginConfig;
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index dc1790733d..0333b8f37a 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.http.sink;
 
 import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
@@ -32,7 +33,8 @@ import java.io.IOException;
 import java.util.Objects;
 
 @Slf4j
-public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+        implements SupportMultiTableSinkWriter<Void> {
     protected final HttpClientProvider httpClient;
     protected final SeaTunnelRowType seaTunnelRowType;
     protected final HttpParameter httpParameter;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 405bc5157f..9dc38cbd1c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -19,7 +19,9 @@ package org.apache.seatunnel.e2e.connector.http;
 
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -44,6 +46,8 @@ public class HttpIT extends TestSuiteBase implements 
TestResource {
 
     private static final String TMP_DIR = "/tmp";
 
+    private static final String successCount = "Total Write Count         :    
               2";
+
     private static final String IMAGE = "mockserver/mockserver:5.14.0";
 
     private GenericContainer<?> mockserverContainer;
@@ -162,6 +166,18 @@ public class HttpIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult18.getExitCode());
     }
 
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK/FLINK do not support multiple 
table read")
+    @TestTemplate
+    public void testMultiTableHttp(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/fake_to_multitable.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertTrue(execResult.getStdout().contains(successCount));
+    }
+
     public String getMockServerConfig() {
         return "/mockserver-config.json";
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_multitable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_multitable.conf
new file mode 100644
index 0000000000..7ed2ea8a59
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/fake_to_multitable.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+
+source {
+  FakeSource {
+    tables_configs = [
+       {
+        schema = {
+          table = "http_sink_1"
+         fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
+      }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "http_sink_2"
+              fields {
+                        id = int
+                        val_bool = boolean
+                        val_int8 = tinyint
+                        val_int16 = smallint
+                        val_int32 = int
+                        val_int64 = bigint
+                        val_float = float
+                        val_double = double
+                        val_decimal = "decimal(16, 1)"
+                        val_string = string
+                        val_unixtime_micros = timestamp
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+             }
+             ]
+      }
+    ]
+  }
+}
+
+
+
+sink {
+   Http {
+        url = "http://mockserver:1080/example/httpMultiTableContentSink";
+        headers {
+            token = "9e32e859ef044462a257e1fc76730066"
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
index 4ce23c4acb..42d000f713 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
@@ -4720,5 +4720,29 @@
         "Content-Type": "application/json"
       }
     }
+  },
+  {
+    "httpRequest": {
+      "path": "/example/httpMultiTableContentSink",
+      "method": "POST",
+      "headers": {
+        "token": ["9e32e859ef044462a257e1fc76730066"]
+      }
+    },
+    "httpResponse": {
+      "body": [
+        {
+          "name": "httpMultiTableContentSink",
+          "age": 18
+        },
+        {
+          "name": "pizz2",
+          "age": 19
+        }
+      ],
+      "headers": {
+        "Content-Type": "application/json"
+      }
+    }
   }
 ]

Reply via email to