This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new eb6aaad5 [Feature][Connectors] add source plugin:
seatunnel-connector-spark-webhook (#1695)
eb6aaad5 is described below
commit eb6aaad51f95298379b17faac465160bf57b61e2
Author: tmljob <[email protected]>
AuthorDate: Tue Apr 26 10:15:32 2022 +0800
[Feature][Connectors] add source plugin: seatunnel-connector-spark-webhook
(#1695)
* add source plugin: seatunnel-connector-spark-webhook
* add license description
* modify plugin name
* fix review suggestion
* add plugin description
* provide stop function to close stream
---
docs/en/connector/source/Webhook.md | 42 +++++++++++++++
.../seatunnel-connectors-spark/pom.xml | 1 +
.../seatunnel-connector-spark-webhook/pom.xml | 56 ++++++++++++++++++++
.../org.apache.seatunnel.spark.BaseSparkSource | 18 +++++++
.../seatunnel/spark/webhook/source/HttpData.scala | 28 ++++++++++
.../spark/webhook/source/HttpPushServlet.scala | 38 ++++++++++++++
.../spark/webhook/source/JettyServerStream.scala | 59 ++++++++++++++++++++++
.../seatunnel/spark/webhook/source/Webhook.scala | 50 ++++++++++++++++++
8 files changed, 292 insertions(+)
diff --git a/docs/en/connector/source/Webhook.md
b/docs/en/connector/source/Webhook.md
new file mode 100644
index 00000000..d669ad52
--- /dev/null
+++ b/docs/en/connector/source/Webhook.md
@@ -0,0 +1,42 @@
+# Webhook
+
+## Description
+
+Provide http interface to push data,only supports post requests.
+
+:::tip
+
+Engine Supported and plugin name
+
+* [x] Spark: Webhook
+* [ ] Flink
+
+:::
+
+## Options
+
+| name | type | required | default value |
+| ---- | ------ | -------- | ------------- |
+| port | int | no | 9999 |
+| path | string | no | / |
+
+### port[int]
+
+Port for push requests, default 9999.
+
+### path[string]
+
+Push request path, default "/".
+
+### common options [string]
+
+Source plugin common parameters, please refer to [Source
Plugin](common-options.mdx) for details.
+
+## Example
+
+```
+Webhook {
+ result_table_name = "request_body"
+ }
+```
+
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
b/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
index 67db31c9..553a54da 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
@@ -53,6 +53,7 @@
<module>seatunnel-connector-spark-iceberg</module>
<module>seatunnel-connector-spark-feishu</module>
<module>seatunnel-connector-spark-http</module>
+ <module>seatunnel-connector-spark-webhook</module>
</modules>
</project>
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/pom.xml
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/pom.xml
new file mode 100644
index 00000000..72fb986d
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-spark</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-connector-spark-webhook</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api-spark</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource
new file mode 100644
index 00000000..d66063ec
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.spark.webhook.source.Webhook
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
new file mode 100644
index 00000000..e3a1c9db
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.webhook.source
+
+import java.util.Date
+
+/**
+ * Streaming data read from local server will have this schema
+ *
+ * @param value - The payload POSTed to http endpoint.
+ * @param timestamp - Timestamp of when it was put on a stream.
+ */
+case class HttpData(value: String, timestamp: Date)
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
new file mode 100644
index 00000000..09205047
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.webhook.source
+
+import javax.servlet.http.{HttpServlet, HttpServletRequest,
HttpServletResponse}
+import scala.io.Source
+import org.apache.spark.sql.execution.streaming.MemoryStream
+
+import java.util.Date
+
+class HttpPushServlet(stream: MemoryStream[HttpData]) extends HttpServlet {
+
+ override def doPost(req: HttpServletRequest, resp: HttpServletResponse):
Unit = {
+ val resBody = Source.fromInputStream(req.getInputStream).mkString
+ val timestamp = new Date(System.currentTimeMillis())
+ stream.addData(HttpData(resBody, timestamp))
+
+ resp.setContentType("application/json;charset=utf-8")
+ resp.setStatus(HttpServletResponse.SC_OK)
+ resp.getWriter.write("""{"success": true}""")
+ }
+
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala
new file mode 100644
index 00000000..78c59eed
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.webhook.source
+
+import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SQLContext}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.spark_project.jetty.server.Server
+import org.spark_project.jetty.servlet.{ServletContextHandler, ServletHolder}
+
+class JettyServerStream(port: Int = 9999, baseUrl: String = "/") {
+
+ // Create server
+ var server: Server = new Server(port)
+
+ /**
+ * Starts an HTTP server and initializes memory stream.
+ * As requests are made to given http endpoint, it puts data on memory
stream.
+ * Returns a streaming DF created off of memory stream.
+ *
+ * @param sqlContext
+ * @return
+ */
+ def toDF(implicit sqlContext: SQLContext): DataFrame = {
+
+ // Create a memory Stream
+ implicit val enc: Encoder[HttpData] = Encoders.product[HttpData]
+ val stream = MemoryStream[HttpData]
+
+ var context = new ServletContextHandler(ServletContextHandler.SESSIONS)
+ context.setContextPath("/")
+ server.setHandler(context)
+
+ val servlet = new HttpPushServlet(stream)
+ context.addServlet(new ServletHolder(servlet), baseUrl)
+
+ // Start server and return streaming DF
+ server.start()
+ stream.toDF()
+ }
+
+ def stop(): Unit = {
+ server.stop()
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala
new file mode 100644
index 00000000..eaa5e0ae
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.webhook.source
+
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.stream.SparkStreamingSource
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row, SparkSession, SQLContext}
+import org.apache.spark.streaming.dstream.DStream
+
+class Webhook extends SparkStreamingSource[String] {
+
+ override def start(env: SparkEnvironment, handler: Dataset[Row] => Unit):
Unit = {
+ var spark = env.getSparkSession
+ // Create HTTP Server and start streaming
+ implicit val sqlContext: SQLContext = spark.sqlContext
+
+ var port = if (config.hasPath("port")) config.getInt("port") else 9999
+ var baseUrl = if (config.hasPath("path")) config.getString("path") else "/"
+
+ val query = new JettyServerStream(port, baseUrl)
+ .toDF
+ .writeStream
+ .foreachBatch((batch, batchId) => {
+ handler(batch)
+ })
+ .start()
+
+ query.awaitTermination()
+ }
+
+ override def rdd2dataset(sparkSession: SparkSession, rdd: RDD[String]):
Dataset[Row] = { null }
+
+ override def getData(env: SparkEnvironment): DStream[String] = { null }
+}