This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f1326446 [Feature][connector][#1652] Get data from Feishu sheet (#1653)
f1326446 is described below

commit f1326446e04a3433ec2be772b34b686fe3eac0d8
Author: KONENET <[email protected]>
AuthorDate: Fri Apr 8 10:02:09 2022 +0800

    [Feature][connector][#1652] Get data from Feishu sheet (#1653)
    
    * [Feature][connector][#1652] Get data from Feishu sheet
---
 docs/en/connector/source/FeishuSheet.md            |  59 ++++++++
 .../seatunnel-connectors-spark/pom.xml             |   1 +
 .../seatunnel-connector-spark-feishu/pom.xml       |  60 ++++++++
 .../org.apache.seatunnel.spark.BaseSparkSource     |  18 +++
 .../org/apache/seatunnel/spark/feishu/Config.scala |  50 +++++++
 .../seatunnel/spark/feishu/FeishuClient.scala      | 160 +++++++++++++++++++++
 .../spark/feishu/source/FeishuSheet.scala          |  66 +++++++++
 seatunnel-core/seatunnel-core-spark/pom.xml        |   6 +
 8 files changed, 420 insertions(+)

diff --git a/docs/en/connector/source/FeishuSheet.md 
b/docs/en/connector/source/FeishuSheet.md
new file mode 100644
index 00000000..744875d1
--- /dev/null
+++ b/docs/en/connector/source/FeishuSheet.md
@@ -0,0 +1,59 @@
+# Feishu Sheet
+
+## Description
+
+Get data from Feishu sheet
+
+:::tip
+
+Engine Supported and plugin name
+
+* [x] Spark: FeishuSheet
+* [ ] Flink
+
+:::
+
+## Options
+
+| name           | type   | required | default value       |
+| ---------------| ------ |----------|---------------------|
+| appId          | string | yes      | -                   |
+| appSecret      | string | yes      | -                   |
+| sheetToken     | string | yes      | -                   |
+| range          | string | no       | all values in sheet |
+| sheetNum       | int    | no       | 1                   |
+| titleLineNum   | int    | no       | 1                   |
+| ignoreTitleLine| bool   | no       | true                |
+
+* appId and appSecret
+  * These two parameters need to get from Feishu open platform.
+  * And open the sheet permission in permission management tab.
+* sheetToken
+  * If you Feishu sheet link is https://xxx.feishu.cn/sheets/shtcnGxninxxxxxxx
+  and the "shtcnGxninxxxxxxx" is sheetToken.
+* range 
+  * The format is A1:D5 or A2:C4 and so on.
+* sheetNum
+  * If you want import first sheet you can input 1 and the default value is 1.
+  * If you want import second one you should input 2.
+* titleLineNum
+  * The default title line the first line.
+  * If you title line is not first, you can change number for it. Like 2, 3 or 
5.
+* ignoreTitleLine
+  * The title line it not save to data, if you want to save title line to 
data, you can set value as false.
+
+### common options [string]
+
+Source plugin common parameters, please refer to [Source 
Plugin](common-options.mdx) for details
+
+## Example
+
+```bash
+    FeishuSheet {
+        result_table_name = "my_dataset"
+        appId = "cli_a2cbxxxxxx"
+        appSecret = "IvhtW7xxxxxxxxxxxxxxx"
+        sheetToken = "shtcn6K3DIixxxxxxxxxxxx"
+        # range = "A1:D4"
+    }
+```
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
index 465d52e8..3f86de60 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
@@ -51,6 +51,7 @@
         <module>seatunnel-connector-spark-tidb</module>
         <module>seatunnel-connector-spark-neo4j</module>
         <module>seatunnel-connector-spark-iceberg</module>
+        <module>seatunnel-connector-spark-feishu</module>
     </modules>
 
 </project>
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
new file mode 100644
index 00000000..fbe4353c
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-spark</artifactId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-connector-spark-feishu</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api-spark</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
new file mode 100644
index 00000000..6efa7182
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.spark.feishu.source.FeishuSheet
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/Config.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/Config.scala
new file mode 100644
index 00000000..2f1a6985
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/Config.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.feishu
+
+object Config extends Serializable {
+
+  final val APP_ID = "appId"
+  final val APP_SECRET = "appSecret"
+  final val SHEET_TOKEN = "sheetToken"
+  final val RANGE = "range"
+  final val SHEET_NUM = "sheetNum"
+
+  /**
+   * Which line is title line and below this line will save as data
+   */
+  final val TITLE_LINE_NUM = "titleLineNum"
+
+  /**
+   * The title line don't save as data
+   */
+  final val IGNORE_TITLE_LINE = "ignoreTitleLine"
+
+  // The Feishu response data key
+  final val TENANT_ACCESS_TOKEN = "tenant_access_token"
+  final val DATA = "data"
+  final val VALUE_RANGE = "valueRange"
+  final val VALUES = "values"
+  final val CODE = "code"
+  final val MSG = "msg"
+  final val SHEETS = "sheets"
+  final val SHEET_ID = "sheetId"
+
+  final val TOKEN_URL = 
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal?app_id=%s&app_secret=%s";
+  final val META_INFO_URL = 
"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/%s/metainfo";
+  final val SHEET_DATA_URL = 
"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/%s/values/%s%s?valueRenderOption=ToString&dateTimeRenderOption=FormattedString";
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
new file mode 100644
index 00000000..9033d8a0
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.feishu
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
+import org.apache.http.{HttpEntity, HttpHeaders}
+import org.apache.http.client.methods.{CloseableHttpResponse, RequestBuilder}
+import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
+import org.apache.http.util.EntityUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
+import org.slf4j.{Logger, LoggerFactory}
+
+class FeishuClient(appId: String, appSecret: String) {
+  val logger: Logger = LoggerFactory.getLogger(this.getClass)
+
+  def getToken: String = {
+    val url = Config.TOKEN_URL.format(appId, appSecret)
+    val result = this.requestFeishuApi(url, null)
+    logger.info(s"Request token and get result $result")
+    result.getString(Config.TENANT_ACCESS_TOKEN)
+  }
+
+  def getSheetId(sheetToken: String, sheetNum: Int): String = {
+    val url = Config.META_INFO_URL.format(sheetToken)
+    val data = this.requestFeishuApiAndGetData(url)
+    if (null == data) {
+      throw new RuntimeException(
+        "Did not get any sheet in Feishu, please make sure there is correct 
sheet token!")
+    }
+
+    val sheets = data.getJSONArray(Config.SHEETS)
+    if (null == sheets || sheets.size() < sheetNum) {
+      throw new RuntimeException(s"The sheet $sheetNum is does not exists")
+    }
+
+    val sheetInfo = sheets.getJSONObject(sheetNum - 1)
+    sheetInfo.getString(Config.SHEET_ID)
+  }
+
+  def getSheetData(sheetToken: String, range: String, sheetId: String): 
JSONArray = {
+    var rangeNew = range
+    // the range format is xxx!A1:C3
+    if (!"".equals(rangeNew)) {
+      rangeNew = "!" + rangeNew
+    }
+    val url = Config.SHEET_DATA_URL.format(sheetToken, sheetId, rangeNew)
+    val data = this.requestFeishuApiAndGetData(url)
+    if (null == data) {
+      throw new RuntimeException("The data is empty, please make sure some 
data in sheet.")
+    }
+
+    val valueRange = data.getJSONObject(Config.VALUE_RANGE)
+    if (null == valueRange) {
+      throw new RuntimeException("The data is empty, please make sure some 
data in sheet.")
+    }
+    valueRange.getJSONArray(Config.VALUES)
+  }
+
+  def getDataset(
+      sheetToken: String,
+      range: String,
+      titleLineNum: Int,
+      ignoreTitleLine: Boolean,
+      sheetNum: Int): (ArrayBuffer[Row], StructType) = {
+    val sheetId = this.getSheetId(sheetToken, sheetNum)
+    val values = getSheetData(sheetToken, range, sheetId)
+    if (values.size() < titleLineNum) {
+      throw new RuntimeException("The title line number is larger than data 
rows")
+    }
+    // start from 0
+    var start = titleLineNum - 1
+
+    if (ignoreTitleLine) {
+      start += 1
+    }
+
+    var schema: StructType = null
+    val schemaData = values.getJSONArray(titleLineNum - 1)
+    val fields = ArrayBuffer[StructField]()
+    for (index <- 0 until schemaData.size()) {
+      val titleName = schemaData.getString(index)
+      if (null == titleName) {
+        throw new RuntimeException("The title name is not allowed null")
+      }
+      val field = DataTypes.createStructField(titleName, DataTypes.StringType, 
true)
+      fields += field
+    }
+    schema = DataTypes.createStructType(fields.toArray)
+
+    val rows = ArrayBuffer[Row]()
+    for (index <- start until values.size()) {
+      val jsonArr = values.getJSONArray(index)
+      val arr = ArrayBuffer[String]()
+      for (indexInner <- 0 until jsonArr.size()) {
+        arr += jsonArr.getString(indexInner)
+      }
+
+      val row = Row.fromSeq(arr)
+      rows += row
+    }
+    (rows, schema)
+  }
+
+  def requestFeishuApiAndGetData(url: String): JSONObject = {
+    val result = this.requestFeishuApi(url, this.getToken)
+    result.getJSONObject(Config.DATA)
+  }
+
+  def requestFeishuApi(url: String, token: String): JSONObject = {
+    val httpGet = RequestBuilder.get()
+      .setUri(url)
+      .setHeader(HttpHeaders.AUTHORIZATION, s"Bearer $token")
+      .build()
+
+    var httpClient: CloseableHttpClient = null
+    var resultStr: String = null
+    try {
+      httpClient = HttpClients.createDefault()
+      val response: CloseableHttpResponse = httpClient.execute(httpGet)
+      val statusCode = response.getStatusLine.getStatusCode
+      if (statusCode != 200) {
+        throw new RuntimeException(s"Request feishu api failed! statusCode is 
$statusCode")
+      }
+      val entity: HttpEntity = response.getEntity
+      resultStr = EntityUtils.toString(entity)
+    } catch {
+      case e: Exception => throw e
+    } finally {
+      if (null != httpClient) {
+        httpClient.close()
+      }
+    }
+
+    val result = JSON.parseObject(resultStr)
+    val code = result.getIntValue(Config.CODE)
+    if (code != 0) {
+      val errorMessage = result.getString(Config.MSG)
+      throw new RuntimeException(
+        s"Request feishu api error, the code is: $code and msg is: 
$errorMessage")
+    }
+    result
+  }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/source/FeishuSheet.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/source/FeishuSheet.scala
new file mode 100644
index 00000000..5b289042
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/source/FeishuSheet.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.feishu.source
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.seatunnel.spark.feishu.{Config, FeishuClient}
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.types.StructType
+
+class FeishuSheet extends SparkBatchSource {
+  override def checkConfig(): CheckResult = {
+    checkAllExists(config, Config.APP_ID, Config.APP_SECRET, 
Config.SHEET_TOKEN)
+  }
+
+  /**
+   * This is a lifecycle method, this method will be executed after Plugin 
created.
+   *
+   * @param env environment
+   */
+  override def prepare(env: SparkEnvironment): Unit = {
+    val defaultConfig = ConfigFactory.parseMap(
+      Map(
+        Config.TITLE_LINE_NUM -> 1,
+        Config.IGNORE_TITLE_LINE -> true,
+        Config.RANGE -> "",
+        Config.SHEET_NUM -> 1))
+    config = config.withFallback(defaultConfig)
+  }
+
+  override def getData(env: SparkEnvironment): Dataset[Row] = {
+    val spark = env.getSparkSession
+    val feishuUtil =
+      new FeishuClient(config.getString(Config.APP_ID), 
config.getString(Config.APP_SECRET))
+
+    val (rows: ArrayBuffer[Row], schema: StructType) = feishuUtil.getDataset(
+      config.getString(Config.SHEET_TOKEN),
+      config.getString(Config.RANGE),
+      config.getInt(Config.TITLE_LINE_NUM),
+      config.getBoolean(Config.IGNORE_TITLE_LINE),
+      config.getInt(Config.SHEET_NUM))
+
+    spark.createDataFrame(rows, schema)
+  }
+}
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml 
b/seatunnel-core/seatunnel-core-spark/pom.xml
index 18970415..2492f329 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -171,6 +171,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-spark-feishu</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-transform-spark-split</artifactId>

Reply via email to