This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f1326446 [Feature][connector][#1652] Get data from Feishu sheet (#1653)
f1326446 is described below
commit f1326446e04a3433ec2be772b34b686fe3eac0d8
Author: KONENET <[email protected]>
AuthorDate: Fri Apr 8 10:02:09 2022 +0800
[Feature][connector][#1652] Get data from Feishu sheet (#1653)
* [Feature][connector][#1652] Get data from Feishu sheet
---
docs/en/connector/source/FeishuSheet.md | 59 ++++++++
.../seatunnel-connectors-spark/pom.xml | 1 +
.../seatunnel-connector-spark-feishu/pom.xml | 60 ++++++++
.../org.apache.seatunnel.spark.BaseSparkSource | 18 +++
.../org/apache/seatunnel/spark/feishu/Config.scala | 50 +++++++
.../seatunnel/spark/feishu/FeishuClient.scala | 160 +++++++++++++++++++++
.../spark/feishu/source/FeishuSheet.scala | 66 +++++++++
seatunnel-core/seatunnel-core-spark/pom.xml | 6 +
8 files changed, 420 insertions(+)
diff --git a/docs/en/connector/source/FeishuSheet.md
b/docs/en/connector/source/FeishuSheet.md
new file mode 100644
index 00000000..744875d1
--- /dev/null
+++ b/docs/en/connector/source/FeishuSheet.md
@@ -0,0 +1,59 @@
+# Feishu Sheet
+
+## Description
+
+Get data from Feishu sheet
+
+:::tip
+
+Engine Supported and plugin name
+
+* [x] Spark: FeishuSheet
+* [ ] Flink
+
+:::
+
+## Options
+
+| name | type | required | default value |
+| ---------------| ------ |----------|---------------------|
+| appId | string | yes | - |
+| appSecret | string | yes | - |
+| sheetToken | string | yes | - |
+| range | string | no | all values in sheet |
+| sheetNum | int | no | 1 |
+| titleLineNum | int | no | 1 |
+| ignoreTitleLine| bool | no | true |
+
+* appId and appSecret
+ * These two parameters need to get from Feishu open platform.
+ * And open the sheet permission in permission management tab.
+* sheetToken
+ * If you Feishu sheet link is https://xxx.feishu.cn/sheets/shtcnGxninxxxxxxx
+ and the "shtcnGxninxxxxxxx" is sheetToken.
+* range
+ * The format is A1:D5 or A2:C4 and so on.
+* sheetNum
+ * If you want import first sheet you can input 1 and the default value is 1.
+ * If you want import second one you should input 2.
+* titleLineNum
+ * The default title line the first line.
+ * If you title line is not first, you can change number for it. Like 2, 3 or
5.
+* ignoreTitleLine
+ * The title line it not save to data, if you want to save title line to
data, you can set value as false.
+
+### common options [string]
+
+Source plugin common parameters, please refer to [Source
Plugin](common-options.mdx) for details
+
+## Example
+
+```bash
+ FeishuSheet {
+ result_table_name = "my_dataset"
+ appId = "cli_a2cbxxxxxx"
+ appSecret = "IvhtW7xxxxxxxxxxxxxxx"
+ sheetToken = "shtcn6K3DIixxxxxxxxxxxx"
+ # range = "A1:D4"
+ }
+```
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
b/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
index 465d52e8..3f86de60 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/pom.xml
@@ -51,6 +51,7 @@
<module>seatunnel-connector-spark-tidb</module>
<module>seatunnel-connector-spark-neo4j</module>
<module>seatunnel-connector-spark-iceberg</module>
+ <module>seatunnel-connector-spark-feishu</module>
</modules>
</project>
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
new file mode 100644
index 00000000..fbe4353c
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-spark</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-connector-spark-feishu</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api-spark</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
new file mode 100644
index 00000000..6efa7182
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.spark.feishu.source.FeishuSheet
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/Config.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/Config.scala
new file mode 100644
index 00000000..2f1a6985
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/Config.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.feishu
+
+object Config extends Serializable {
+
+ final val APP_ID = "appId"
+ final val APP_SECRET = "appSecret"
+ final val SHEET_TOKEN = "sheetToken"
+ final val RANGE = "range"
+ final val SHEET_NUM = "sheetNum"
+
+ /**
+ * Which line is title line and below this line will save as data
+ */
+ final val TITLE_LINE_NUM = "titleLineNum"
+
+ /**
+ * The title line don't save as data
+ */
+ final val IGNORE_TITLE_LINE = "ignoreTitleLine"
+
+ // The Feishu response data key
+ final val TENANT_ACCESS_TOKEN = "tenant_access_token"
+ final val DATA = "data"
+ final val VALUE_RANGE = "valueRange"
+ final val VALUES = "values"
+ final val CODE = "code"
+ final val MSG = "msg"
+ final val SHEETS = "sheets"
+ final val SHEET_ID = "sheetId"
+
+ final val TOKEN_URL =
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal?app_id=%s&app_secret=%s"
+ final val META_INFO_URL =
"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/%s/metainfo"
+ final val SHEET_DATA_URL =
"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/%s/values/%s%s?valueRenderOption=ToString&dateTimeRenderOption=FormattedString"
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
new file mode 100644
index 00000000..9033d8a0
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.feishu
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
+import org.apache.http.{HttpEntity, HttpHeaders}
+import org.apache.http.client.methods.{CloseableHttpResponse, RequestBuilder}
+import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
+import org.apache.http.util.EntityUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
+import org.slf4j.{Logger, LoggerFactory}
+
+class FeishuClient(appId: String, appSecret: String) {
+ val logger: Logger = LoggerFactory.getLogger(this.getClass)
+
+ def getToken: String = {
+ val url = Config.TOKEN_URL.format(appId, appSecret)
+ val result = this.requestFeishuApi(url, null)
+ logger.info(s"Request token and get result $result")
+ result.getString(Config.TENANT_ACCESS_TOKEN)
+ }
+
+ def getSheetId(sheetToken: String, sheetNum: Int): String = {
+ val url = Config.META_INFO_URL.format(sheetToken)
+ val data = this.requestFeishuApiAndGetData(url)
+ if (null == data) {
+ throw new RuntimeException(
+ "Did not get any sheet in Feishu, please make sure there is correct
sheet token!")
+ }
+
+ val sheets = data.getJSONArray(Config.SHEETS)
+ if (null == sheets || sheets.size() < sheetNum) {
+ throw new RuntimeException(s"The sheet $sheetNum is does not exists")
+ }
+
+ val sheetInfo = sheets.getJSONObject(sheetNum - 1)
+ sheetInfo.getString(Config.SHEET_ID)
+ }
+
+ def getSheetData(sheetToken: String, range: String, sheetId: String):
JSONArray = {
+ var rangeNew = range
+ // the range format is xxx!A1:C3
+ if (!"".equals(rangeNew)) {
+ rangeNew = "!" + rangeNew
+ }
+ val url = Config.SHEET_DATA_URL.format(sheetToken, sheetId, rangeNew)
+ val data = this.requestFeishuApiAndGetData(url)
+ if (null == data) {
+ throw new RuntimeException("The data is empty, please make sure some
data in sheet.")
+ }
+
+ val valueRange = data.getJSONObject(Config.VALUE_RANGE)
+ if (null == valueRange) {
+ throw new RuntimeException("The data is empty, please make sure some
data in sheet.")
+ }
+ valueRange.getJSONArray(Config.VALUES)
+ }
+
+ def getDataset(
+ sheetToken: String,
+ range: String,
+ titleLineNum: Int,
+ ignoreTitleLine: Boolean,
+ sheetNum: Int): (ArrayBuffer[Row], StructType) = {
+ val sheetId = this.getSheetId(sheetToken, sheetNum)
+ val values = getSheetData(sheetToken, range, sheetId)
+ if (values.size() < titleLineNum) {
+ throw new RuntimeException("The title line number is larger than data
rows")
+ }
+ // start from 0
+ var start = titleLineNum - 1
+
+ if (ignoreTitleLine) {
+ start += 1
+ }
+
+ var schema: StructType = null
+ val schemaData = values.getJSONArray(titleLineNum - 1)
+ val fields = ArrayBuffer[StructField]()
+ for (index <- 0 until schemaData.size()) {
+ val titleName = schemaData.getString(index)
+ if (null == titleName) {
+ throw new RuntimeException("The title name is not allowed null")
+ }
+ val field = DataTypes.createStructField(titleName, DataTypes.StringType,
true)
+ fields += field
+ }
+ schema = DataTypes.createStructType(fields.toArray)
+
+ val rows = ArrayBuffer[Row]()
+ for (index <- start until values.size()) {
+ val jsonArr = values.getJSONArray(index)
+ val arr = ArrayBuffer[String]()
+ for (indexInner <- 0 until jsonArr.size()) {
+ arr += jsonArr.getString(indexInner)
+ }
+
+ val row = Row.fromSeq(arr)
+ rows += row
+ }
+ (rows, schema)
+ }
+
+ def requestFeishuApiAndGetData(url: String): JSONObject = {
+ val result = this.requestFeishuApi(url, this.getToken)
+ result.getJSONObject(Config.DATA)
+ }
+
+ def requestFeishuApi(url: String, token: String): JSONObject = {
+ val httpGet = RequestBuilder.get()
+ .setUri(url)
+ .setHeader(HttpHeaders.AUTHORIZATION, s"Bearer $token")
+ .build()
+
+ var httpClient: CloseableHttpClient = null
+ var resultStr: String = null
+ try {
+ httpClient = HttpClients.createDefault()
+ val response: CloseableHttpResponse = httpClient.execute(httpGet)
+ val statusCode = response.getStatusLine.getStatusCode
+ if (statusCode != 200) {
+ throw new RuntimeException(s"Request feishu api failed! statusCode is
$statusCode")
+ }
+ val entity: HttpEntity = response.getEntity
+ resultStr = EntityUtils.toString(entity)
+ } catch {
+ case e: Exception => throw e
+ } finally {
+ if (null != httpClient) {
+ httpClient.close()
+ }
+ }
+
+ val result = JSON.parseObject(resultStr)
+ val code = result.getIntValue(Config.CODE)
+ if (code != 0) {
+ val errorMessage = result.getString(Config.MSG)
+ throw new RuntimeException(
+ s"Request feishu api error, the code is: $code and msg is:
$errorMessage")
+ }
+ result
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/source/FeishuSheet.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/source/FeishuSheet.scala
new file mode 100644
index 00000000..5b289042
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/source/FeishuSheet.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.feishu.source
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
+import org.apache.seatunnel.spark.SparkEnvironment
+import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.seatunnel.spark.feishu.{Config, FeishuClient}
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.types.StructType
+
+class FeishuSheet extends SparkBatchSource {
+ override def checkConfig(): CheckResult = {
+ checkAllExists(config, Config.APP_ID, Config.APP_SECRET,
Config.SHEET_TOKEN)
+ }
+
+ /**
+ * This is a lifecycle method, this method will be executed after Plugin
created.
+ *
+ * @param env environment
+ */
+ override def prepare(env: SparkEnvironment): Unit = {
+ val defaultConfig = ConfigFactory.parseMap(
+ Map(
+ Config.TITLE_LINE_NUM -> 1,
+ Config.IGNORE_TITLE_LINE -> true,
+ Config.RANGE -> "",
+ Config.SHEET_NUM -> 1))
+ config = config.withFallback(defaultConfig)
+ }
+
+ override def getData(env: SparkEnvironment): Dataset[Row] = {
+ val spark = env.getSparkSession
+ val feishuUtil =
+ new FeishuClient(config.getString(Config.APP_ID),
config.getString(Config.APP_SECRET))
+
+ val (rows: ArrayBuffer[Row], schema: StructType) = feishuUtil.getDataset(
+ config.getString(Config.SHEET_TOKEN),
+ config.getString(Config.RANGE),
+ config.getInt(Config.TITLE_LINE_NUM),
+ config.getBoolean(Config.IGNORE_TITLE_LINE),
+ config.getInt(Config.SHEET_NUM))
+
+ spark.createDataFrame(rows, schema)
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml
b/seatunnel-core/seatunnel-core-spark/pom.xml
index 18970415..2492f329 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -171,6 +171,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-spark-feishu</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-transform-spark-split</artifactId>