This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new eb6aaad5 [Feature][Connectors] add source plugin: 
seatunnel-connector-spark-webhook (#1695)
eb6aaad5 is described below

commit eb6aaad51f95298379b17faac465160bf57b61e2
Author: tmljob <[email protected]>
AuthorDate: Tue Apr 26 10:15:32 2022 +0800

    [Feature][Connectors] add source plugin: seatunnel-connector-spark-webhook 
(#1695)
    
    * add source plugin: seatunnel-connector-spark-webhook
    
    * add license description
    
    * modify plugin name
    
    * fix review suggestion
    
    * add plugin description
    
    * provide stop function to close stream
---
 docs/en/connector/source/Webhook.md                | 42 +++++++++++++++
 .../seatunnel-connectors-spark/pom.xml             |  1 +
 .../seatunnel-connector-spark-webhook/pom.xml      | 56 ++++++++++++++++++++
 .../org.apache.seatunnel.spark.BaseSparkSource     | 18 +++++++
 .../seatunnel/spark/webhook/source/HttpData.scala  | 28 ++++++++++
 .../spark/webhook/source/HttpPushServlet.scala     | 38 ++++++++++++++
 .../spark/webhook/source/JettyServerStream.scala   | 59 ++++++++++++++++++++++
 .../seatunnel/spark/webhook/source/Webhook.scala   | 50 ++++++++++++++++++
 8 files changed, 292 insertions(+)

diff --git a/docs/en/connector/source/Webhook.md 
b/docs/en/connector/source/Webhook.md
new file mode 100644
index 00000000..d669ad52
--- /dev/null
+++ b/docs/en/connector/source/Webhook.md
@@ -0,0 +1,42 @@
+# Webhook
+
+## Description
+
+Provide http interface to push data,only supports post requests.
+
+:::tip
+
+Engine Supported and plugin name
+
+* [x] Spark: Webhook
+* [ ] Flink
+
+:::
+
+## Options
+
+| name | type   | required | default value |
+| ---- | ------ | -------- | ------------- |
+| port | int    | no       | 9999          |
+| path | string | no       | /             |
+
+### port[int]
+
+Port for push requests, default 9999.
+
+### path[string]
+
+Push request path, default "/".
+
+### common options [string]
+
+Source plugin common parameters, please refer to [Source 
Plugin](common-options.mdx) for details.
+
+## Example
+
+```
+Webhook {
+      result_table_name = "request_body"
+   }
+```
+
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
index 67db31c9..553a54da 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
@@ -53,6 +53,7 @@
         <module>seatunnel-connector-spark-iceberg</module>
         <module>seatunnel-connector-spark-feishu</module>
         <module>seatunnel-connector-spark-http</module>
+        <module>seatunnel-connector-spark-webhook</module>
     </modules>
 
 </project>
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/pom.xml
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/pom.xml
new file mode 100644
index 00000000..72fb986d
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <groupId>org.apache.seatunnel</groupId>
+    <artifactId>seatunnel-connectors-spark</artifactId>
+    <version>${revision}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>seatunnel-connector-spark-webhook</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.seatunnel</groupId>
+      <artifactId>seatunnel-api-spark</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+    </dependency>
+
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource
new file mode 100644
index 00000000..d66063ec
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.spark.webhook.source.Webhook
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
new file mode 100644
index 00000000..e3a1c9db
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.webhook.source
+
+import java.util.Date
+
+/**
+ * Streaming data read from local server will have this schema
+ *
+ * @param value - The payload POSTed to http endpoint.
+ * @param timestamp - Timestamp of when it was put on a stream.
+ */
+case class HttpData(value: String, timestamp: Date)
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
new file mode 100644
index 00000000..09205047
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.webhook.source
+
+import javax.servlet.http.{HttpServlet, HttpServletRequest, 
HttpServletResponse}
+import scala.io.Source
+import org.apache.spark.sql.execution.streaming.MemoryStream
+
+import java.util.Date
+
+class HttpPushServlet(stream: MemoryStream[HttpData]) extends HttpServlet {
+
+  override def doPost(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
+    val resBody = Source.fromInputStream(req.getInputStream).mkString
+    val timestamp = new Date(System.currentTimeMillis())
+    stream.addData(HttpData(resBody, timestamp))
+
+    resp.setContentType("application/json;charset=utf-8")
+    resp.setStatus(HttpServletResponse.SC_OK)
+    resp.getWriter.write("""{"success": true}""")
+  }
+
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala
new file mode 100644
index 00000000..78c59eed
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.webhook.source
+
+import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SQLContext}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.spark_project.jetty.server.Server
+import org.spark_project.jetty.servlet.{ServletContextHandler, ServletHolder}
+
+class JettyServerStream(port: Int = 9999, baseUrl: String = "/") {
+
+  // Create server
+  var server: Server = new Server(port)
+
+  /**
+   * Starts an HTTP server and initializes memory stream.
+   * As requests are made to given http endpoint, it puts data on memory 
stream.
+   * Returns a streaming DF created off of memory stream.
+   *
+   * @param sqlContext
+   * @return
+   */
+  def toDF(implicit sqlContext: SQLContext): DataFrame = {
+
+    // Create a memory Stream
+    implicit val enc: Encoder[HttpData] = Encoders.product[HttpData]
+    val stream = MemoryStream[HttpData]
+
+    var context = new ServletContextHandler(ServletContextHandler.SESSIONS)
+    context.setContextPath("/")
+    server.setHandler(context)
+
+    val servlet = new HttpPushServlet(stream)
+    context.addServlet(new ServletHolder(servlet), baseUrl)
+
+    // Start server and return streaming DF
+    server.start()
+    stream.toDF()
+  }
+
+  def stop(): Unit = {
+    server.stop()
+  }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala
new file mode 100644
index 00000000..eaa5e0ae
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.webhook.source
+
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.stream.SparkStreamingSource
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Dataset, Row, SparkSession, SQLContext}
+import org.apache.spark.streaming.dstream.DStream
+
+class Webhook extends SparkStreamingSource[String] {
+
+  override def start(env: SparkEnvironment, handler: Dataset[Row] => Unit): 
Unit = {
+    var spark = env.getSparkSession
+    // Create HTTP Server and start streaming
+    implicit val sqlContext: SQLContext = spark.sqlContext
+
+    var port = if (config.hasPath("port")) config.getInt("port") else 9999
+    var baseUrl = if (config.hasPath("path")) config.getString("path") else "/"
+
+    val query = new JettyServerStream(port, baseUrl)
+      .toDF
+      .writeStream
+      .foreachBatch((batch, batchId) => {
+        handler(batch)
+      })
+      .start()
+
+    query.awaitTermination()
+  }
+
+  override def rdd2dataset(sparkSession: SparkSession, rdd: RDD[String]): 
Dataset[Row] = { null }
+
+  override def getData(env: SparkEnvironment): DStream[String] = { null }
+}

Reply via email to