This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new b51b3fb07 feat: collect feishu chat(group) & message (#5001)
b51b3fb07 is described below

commit b51b3fb07861dd7c94f064d7118f83a4e2f62f2d
Author: Likyh <[email protected]>
AuthorDate: Fri May 12 08:55:51 2023 +0800

    feat: collect feishu chat(group) & message (#5001)
    
    * feat: collect feishu chat(group) & message
    
    * fix: fix for ci
    
    * fix: delete a unused code
---
 backend/plugins/feishu/apimodels/im_result.go      |  58 ++++++++++
 backend/plugins/feishu/impl/impl.go                |   6 ++
 .../{tasks/task_data.go => models/chat_item.go}    |  29 ++---
 backend/plugins/feishu/models/message.go           |  45 ++++++++
 ..._init_tables.go => 20230421_add_init_tables.go} |  10 +-
 .../migrationscripts/archived/chat_item.go}        |  29 ++---
 .../models/migrationscripts/archived/message.go    |  45 ++++++++
 backend/plugins/feishu/tasks/api_client.go         |   4 +-
 backend/plugins/feishu/tasks/chat_collector.go     |  92 ++++++++++++++++
 .../tasks/{api_client.go => chat_extractor.go}     |  45 +++++---
 backend/plugins/feishu/tasks/message_collector.go  | 119 +++++++++++++++++++++
 backend/plugins/feishu/tasks/message_extractor.go  |  87 +++++++++++++++
 backend/plugins/feishu/tasks/task_data.go          |   6 +-
 13 files changed, 526 insertions(+), 49 deletions(-)

diff --git a/backend/plugins/feishu/apimodels/im_result.go 
b/backend/plugins/feishu/apimodels/im_result.go
new file mode 100644
index 000000000..2f900dbff
--- /dev/null
+++ b/backend/plugins/feishu/apimodels/im_result.go
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package apimodels
+
+import "encoding/json"
+
+type FeishuImApiResult struct {
+       Code int `json:"code"`
+       Data struct {
+               HasMore   bool              `json:"has_more"`
+               Items     []json.RawMessage `json:"items"`
+               PageToken string            `json:"page_token"`
+       } `json:"data"`
+       Msg string `json:"msg"`
+}
+
+type FeishuMessageResultItem struct {
+       Body struct {
+               Content string `json:"content"`
+       } `json:"body"`
+       ChatId     string `json:"chat_id"`
+       CreateTime string `json:"create_time"`
+       Deleted    bool   `json:"deleted"`
+       Mentions   []struct {
+               Id        string `json:"id"`
+               IdType    string `json:"id_type"`
+               Key       string `json:"key"`
+               Name      string `json:"name"`
+               TenantKey string `json:"tenant_key"`
+       } `json:"mentions"`
+       MessageId string `json:"message_id"`
+       MsgType   string `json:"msg_type"`
+       ParentId  string `json:"parent_id"`
+       RootId    string `json:"root_id"`
+       Sender    struct {
+               Id         string `json:"id"`
+               IdType     string `json:"id_type"`
+               SenderType string `json:"sender_type"`
+               TenantKey  string `json:"tenant_key"`
+       } `json:"sender"`
+       UpdateTime string `json:"update_time"`
+       Updated    bool   `json:"updated"`
+}
diff --git a/backend/plugins/feishu/impl/impl.go 
b/backend/plugins/feishu/impl/impl.go
index 3f6952fea..113463a5d 100644
--- a/backend/plugins/feishu/impl/impl.go
+++ b/backend/plugins/feishu/impl/impl.go
@@ -59,6 +59,12 @@ func (p Feishu) Description() string {
 
 func (p Feishu) SubTaskMetas() []plugin.SubTaskMeta {
        return []plugin.SubTaskMeta{
+               tasks.CollectChatMeta,
+               tasks.ExtractChatItemMeta,
+
+               tasks.CollectMessageMeta,
+               tasks.ExtractMessageMeta,
+
                tasks.CollectMeetingTopUserItemMeta,
                tasks.ExtractMeetingTopUserItemMeta,
        }
diff --git a/backend/plugins/feishu/tasks/task_data.go 
b/backend/plugins/feishu/models/chat_item.go
similarity index 55%
copy from backend/plugins/feishu/tasks/task_data.go
copy to backend/plugins/feishu/models/chat_item.go
index 79099bc6a..8e80dd312 100644
--- a/backend/plugins/feishu/tasks/task_data.go
+++ b/backend/plugins/feishu/models/chat_item.go
@@ -15,24 +15,25 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package tasks
+package models
 
 import (
-       helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "github.com/apache/incubator-devlake/plugins/feishu/models"
+       "github.com/apache/incubator-devlake/core/models/common"
 )
 
-type FeishuApiParams struct {
-       ConnectionId uint64 `json:"connectionId"`
+type FeishuChatItem struct {
+       common.NoPKModel `json:"-"`
+       ConnectionId     uint64 `gorm:"primaryKey"`
+       ChatId           string `json:"chat_id" gorm:"primaryKey"`
+       Avatar           string `json:"avatar"`
+       Description      string `json:"description"`
+       External         bool   `json:"external"`
+       Name             string `json:"name"`
+       OwnerId          string `json:"owner_id"`
+       OwnerIdType      string `json:"owner_id_type"`
+       TenantKey        string `json:"tenant_key"`
 }
 
-type FeishuOptions struct {
-       ConnectionId       uint64  `json:"connectionId"`
-       NumOfDaysToCollect float64 `json:"numOfDaysToCollect"`
-}
-
-type FeishuTaskData struct {
-       Options                  *FeishuOptions
-       ApiClient                *helper.ApiAsyncClient
-       FeishuMeetingTopUserItem *models.FeishuMeetingTopUserItem
+func (FeishuChatItem) TableName() string {
+       return "_tool_feishu_chats"
 }
diff --git a/backend/plugins/feishu/models/message.go 
b/backend/plugins/feishu/models/message.go
new file mode 100644
index 000000000..e2a42208f
--- /dev/null
+++ b/backend/plugins/feishu/models/message.go
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package models
+
+import (
+       "github.com/apache/incubator-devlake/core/models/common"
+       "time"
+)
+
+type FeishuMessage struct {
+       common.NoPKModel `json:"-"`
+       ConnectionId     uint64    `gorm:"primaryKey"`
+       MessageId        string    `json:"message_id" gorm:"primaryKey"`
+       Content          string    `json:"content"`
+       ChatId           string    `json:"chat_id"`
+       MsgType          string    `json:"msg_type"`
+       ParentId         string    `json:"parent_id"`
+       RootId           string    `json:"root_id"`
+       SenderId         string    `json:"id"`
+       SenderIdType     string    `json:"id_type"`
+       SenderType       string    `json:"sender_type"`
+       Deleted          bool      `json:"deleted"`
+       CreateTime       time.Time `json:"create_time"`
+       UpdateTime       time.Time `json:"update_time"`
+       Updated          bool      `json:"updated"`
+}
+
+func (FeishuMessage) TableName() string {
+       return "_tool_feishu_messages"
+}
diff --git 
a/backend/plugins/feishu/models/migrationscripts/20220714_add_init_tables.go 
b/backend/plugins/feishu/models/migrationscripts/20230421_add_init_tables.go
similarity index 89%
rename from 
backend/plugins/feishu/models/migrationscripts/20220714_add_init_tables.go
rename to 
backend/plugins/feishu/models/migrationscripts/20230421_add_init_tables.go
index e143e59f0..b111e43dc 100644
--- a/backend/plugins/feishu/models/migrationscripts/20220714_add_init_tables.go
+++ b/backend/plugins/feishu/models/migrationscripts/20230421_add_init_tables.go
@@ -34,6 +34,8 @@ func (u *addInitTables) Up(basicRes context.BasicRes) 
errors.Error {
        err := db.DropTables(
                &archived.FeishuConnection{},
                &archived.FeishuMeetingTopUserItem{},
+               &archived.FeishuChatItem{},
+               &archived.FeishuMessage{},
        )
        if err != nil {
                return err
@@ -43,6 +45,8 @@ func (u *addInitTables) Up(basicRes context.BasicRes) 
errors.Error {
                basicRes,
                &archived.FeishuConnection{},
                &archived.FeishuMeetingTopUserItem{},
+               &archived.FeishuChatItem{},
+               &archived.FeishuMessage{},
        )
        if err != nil {
                return err
@@ -55,6 +59,10 @@ func (u *addInitTables) Up(basicRes context.BasicRes) 
errors.Error {
        connection.SecretKey = basicRes.GetConfig(`FEISHU_APPSCRECT`)
        connection.Name = `Feishu`
        if connection.Endpoint != `` && connection.AppId != `` && 
connection.SecretKey != `` && encodeKey != `` {
+               connection.SecretKey, err = plugin.Encrypt(encodeKey, 
connection.SecretKey)
+               if err != nil {
+                       return err
+               }
                // update from .env and save to db
                err = db.CreateIfNotExist(connection)
                if err != nil {
@@ -65,7 +73,7 @@ func (u *addInitTables) Up(basicRes context.BasicRes) 
errors.Error {
 }
 
 func (*addInitTables) Version() uint64 {
-       return 20220714000001
+       return 20230421000001
 }
 
 func (*addInitTables) Name() string {
diff --git a/backend/plugins/feishu/tasks/task_data.go 
b/backend/plugins/feishu/models/migrationscripts/archived/chat_item.go
similarity index 54%
copy from backend/plugins/feishu/tasks/task_data.go
copy to backend/plugins/feishu/models/migrationscripts/archived/chat_item.go
index 79099bc6a..fc5669990 100644
--- a/backend/plugins/feishu/tasks/task_data.go
+++ b/backend/plugins/feishu/models/migrationscripts/archived/chat_item.go
@@ -15,24 +15,25 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package tasks
+package archived
 
 import (
-       helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "github.com/apache/incubator-devlake/plugins/feishu/models"
+       
"github.com/apache/incubator-devlake/core/models/migrationscripts/archived"
 )
 
-type FeishuApiParams struct {
-       ConnectionId uint64 `json:"connectionId"`
+type FeishuChatItem struct {
+       archived.NoPKModel `json:"-"`
+       ConnectionId       uint64 `gorm:"primaryKey"`
+       ChatId             string `json:"chat_id" gorm:"primaryKey"`
+       Avatar             string `json:"avatar"`
+       Description        string `json:"description"`
+       External           bool   `json:"external"`
+       Name               string `json:"name"`
+       OwnerId            string `json:"owner_id"`
+       OwnerIdType        string `json:"owner_id_type"`
+       TenantKey          string `json:"tenant_key"`
 }
 
-type FeishuOptions struct {
-       ConnectionId       uint64  `json:"connectionId"`
-       NumOfDaysToCollect float64 `json:"numOfDaysToCollect"`
-}
-
-type FeishuTaskData struct {
-       Options                  *FeishuOptions
-       ApiClient                *helper.ApiAsyncClient
-       FeishuMeetingTopUserItem *models.FeishuMeetingTopUserItem
+func (FeishuChatItem) TableName() string {
+       return "_tool_feishu_chats"
 }
diff --git a/backend/plugins/feishu/models/migrationscripts/archived/message.go 
b/backend/plugins/feishu/models/migrationscripts/archived/message.go
new file mode 100644
index 000000000..e8bebebd3
--- /dev/null
+++ b/backend/plugins/feishu/models/migrationscripts/archived/message.go
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package archived
+
+import (
+       
"github.com/apache/incubator-devlake/core/models/migrationscripts/archived"
+       "time"
+)
+
+type FeishuMessage struct {
+       archived.NoPKModel `json:"-"`
+       ConnectionId       uint64    `gorm:"primaryKey"`
+       MessageId          string    `json:"message_id" gorm:"primaryKey"`
+       Content            string    `json:"content"`
+       ChatId             string    `json:"chat_id"`
+       MsgType            string    `json:"msg_type"`
+       ParentId           string    `json:"parent_id"`
+       RootId             string    `json:"root_id"`
+       SenderId           string    `json:"id"`
+       SenderIdType       string    `json:"id_type"`
+       SenderType         string    `json:"sender_type"`
+       Deleted            bool      `json:"deleted"`
+       CreateTime         time.Time `json:"create_time"`
+       UpdateTime         time.Time `json:"update_time"`
+       Updated            bool      `json:"updated"`
+}
+
+func (FeishuMessage) TableName() string {
+       return "_tool_feishu_messages"
+}
diff --git a/backend/plugins/feishu/tasks/api_client.go 
b/backend/plugins/feishu/tasks/api_client.go
index e015056d3..6bb3ed9fb 100644
--- a/backend/plugins/feishu/tasks/api_client.go
+++ b/backend/plugins/feishu/tasks/api_client.go
@@ -34,12 +34,12 @@ func NewFeishuApiClient(taskCtx plugin.TaskContext, 
connection *models.FeishuCon
        }
 
        // create async api client
-       asyncApiCLient, err := api.CreateAsyncApiClient(taskCtx, apiClient, 
&api.ApiRateLimitCalculator{
+       asyncApiClient, err := api.CreateAsyncApiClient(taskCtx, apiClient, 
&api.ApiRateLimitCalculator{
                UserRateLimitPerHour: connection.RateLimitPerHour,
        })
        if err != nil {
                return nil, err
        }
 
-       return asyncApiCLient, nil
+       return asyncApiClient, nil
 }
diff --git a/backend/plugins/feishu/tasks/chat_collector.go 
b/backend/plugins/feishu/tasks/chat_collector.go
new file mode 100644
index 000000000..f76e8c022
--- /dev/null
+++ b/backend/plugins/feishu/tasks/chat_collector.go
@@ -0,0 +1,92 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "encoding/json"
+       "net/http"
+       "net/url"
+       "strconv"
+
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/plugins/feishu/apimodels"
+)
+
+const RAW_CHAT_TABLE = "feishu_chat_item"
+
+var _ plugin.SubTaskEntryPoint = CollectChat
+
+// CollectChat collect all chats that bot is in
+func CollectChat(taskCtx plugin.SubTaskContext) errors.Error {
+       data := taskCtx.GetData().(*FeishuTaskData)
+       pageSize := 50
+       collector, err := api.NewApiCollector(api.ApiCollectorArgs{
+               RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+                       Ctx: taskCtx,
+                       Params: FeishuApiParams{
+                               ConnectionId: data.Options.ConnectionId,
+                       },
+                       Table: RAW_CHAT_TABLE,
+               },
+               ApiClient:   data.ApiClient,
+               Incremental: false,
+               UrlTemplate: "im/v1/chats",
+               PageSize:    pageSize,
+               GetNextPageCustomData: func(prevReqData *api.RequestData, 
prevPageResponse *http.Response) (interface{}, errors.Error) {
+                       res := apimodels.FeishuImApiResult{}
+                       err := api.UnmarshalResponse(prevPageResponse, &res)
+                       if err != nil {
+                               return nil, err
+                       }
+                       if !res.Data.HasMore {
+                               return nil, api.ErrFinishCollect
+                       }
+                       return res.Data.PageToken, nil
+               },
+               Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
+                       query := url.Values{}
+                       query.Set("page_size", strconv.Itoa(pageSize))
+                       if pageToken, ok := reqData.CustomData.(string); ok && 
pageToken != "" {
+                               query.Set("page_token", 
reqData.CustomData.(string))
+                       }
+                       return query, nil
+               },
+               ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
+                       body := &apimodels.FeishuImApiResult{}
+                       err := api.UnmarshalResponse(res, body)
+                       if err != nil {
+                               return nil, err
+                       }
+                       return body.Data.Items, nil
+               },
+       })
+       if err != nil {
+               return err
+       }
+
+       return collector.Execute()
+}
+
+var CollectChatMeta = plugin.SubTaskMeta{
+       Name:             "collectChat",
+       EntryPoint:       CollectChat,
+       EnabledByDefault: true,
+       Description:      "Collect chats from Feishu api",
+}
diff --git a/backend/plugins/feishu/tasks/api_client.go 
b/backend/plugins/feishu/tasks/chat_extractor.go
similarity index 51%
copy from backend/plugins/feishu/tasks/api_client.go
copy to backend/plugins/feishu/tasks/chat_extractor.go
index e015056d3..abad57fe5 100644
--- a/backend/plugins/feishu/tasks/api_client.go
+++ b/backend/plugins/feishu/tasks/chat_extractor.go
@@ -18,28 +18,45 @@ limitations under the License.
 package tasks
 
 import (
+       "encoding/json"
        "github.com/apache/incubator-devlake/core/errors"
        "github.com/apache/incubator-devlake/core/plugin"
        "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
        "github.com/apache/incubator-devlake/plugins/feishu/models"
 )
 
-// const AUTH_ENDPOINT = "https://open.feishu.cn";
-// const ENDPOINT = "https://open.feishu.cn/open-apis/vc/v1";
-
-func NewFeishuApiClient(taskCtx plugin.TaskContext, connection 
*models.FeishuConnection) (*api.ApiAsyncClient, errors.Error) {
-       apiClient, err := api.NewApiClientFromConnection(taskCtx.GetContext(), 
taskCtx, connection)
-       if err != nil {
-               return nil, err
-       }
-
-       // create async api client
-       asyncApiCLient, err := api.CreateAsyncApiClient(taskCtx, apiClient, 
&api.ApiRateLimitCalculator{
-               UserRateLimitPerHour: connection.RateLimitPerHour,
+var _ plugin.SubTaskEntryPoint = ExtractChatItem
+
+func ExtractChatItem(taskCtx plugin.SubTaskContext) errors.Error {
+       data := taskCtx.GetData().(*FeishuTaskData)
+       extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
+               RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+                       Ctx: taskCtx,
+                       Params: FeishuApiParams{
+                               ConnectionId: data.Options.ConnectionId,
+                       },
+                       Table: RAW_CHAT_TABLE,
+               },
+               Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
+                       body := &models.FeishuChatItem{}
+                       err := errors.Convert(json.Unmarshal(row.Data, body))
+                       if err != nil {
+                               return nil, err
+                       }
+                       body.ConnectionId = data.Options.ConnectionId
+                       return []interface{}{body}, nil
+               },
        })
        if err != nil {
-               return nil, err
+               return err
        }
 
-       return asyncApiCLient, nil
+       return extractor.Execute()
+}
+
+var ExtractChatItemMeta = plugin.SubTaskMeta{
+       Name:             "extractChatItem",
+       EntryPoint:       ExtractChatItem,
+       EnabledByDefault: true,
+       Description:      "Extract raw chats data into tool layer table 
feishu_meeting_top_user_item",
 }
diff --git a/backend/plugins/feishu/tasks/message_collector.go 
b/backend/plugins/feishu/tasks/message_collector.go
new file mode 100644
index 000000000..3823f7f09
--- /dev/null
+++ b/backend/plugins/feishu/tasks/message_collector.go
@@ -0,0 +1,119 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "encoding/json"
+       "github.com/apache/incubator-devlake/core/dal"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/plugins/feishu/apimodels"
+       "net/http"
+       "net/url"
+       "reflect"
+       "strconv"
+)
+
+const RAW_MESSAGE_TABLE = "feishu_message"
+
+var _ plugin.SubTaskEntryPoint = CollectMessage
+
+type ChatInput struct {
+       ChatId string `json:"chat_id"`
+}
+
+func CollectMessage(taskCtx plugin.SubTaskContext) errors.Error {
+       data := taskCtx.GetData().(*FeishuTaskData)
+       db := taskCtx.GetDal()
+
+       clauses := []dal.Clause{
+               dal.Select("chat_id AS chat_id"),
+               dal.From("_tool_feishu_chats"),
+               dal.Where("connection_id=?", data.Options.ConnectionId),
+       }
+
+       // construct the input iterator
+       cursor, err := db.Cursor(clauses...)
+       if err != nil {
+               return err
+       }
+       // smaller struct can reduce memory footprint, we should try to avoid 
using big struct
+       iterator, err := api.NewDalCursorIterator(db, cursor, 
reflect.TypeOf(ChatInput{}))
+       if err != nil {
+               return err
+       }
+
+       pageSize := 50
+       collector, err := api.NewApiCollector(api.ApiCollectorArgs{
+               RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+                       Ctx: taskCtx,
+                       Params: FeishuApiParams{
+                               ConnectionId: data.Options.ConnectionId,
+                       },
+                       Table: RAW_MESSAGE_TABLE,
+               },
+               ApiClient:   data.ApiClient,
+               Incremental: false,
+               Input:       iterator,
+               UrlTemplate: "im/v1/messages",
+               PageSize:    pageSize,
+               GetNextPageCustomData: func(prevReqData *api.RequestData, 
prevPageResponse *http.Response) (interface{}, errors.Error) {
+                       res := apimodels.FeishuImApiResult{}
+                       err := api.UnmarshalResponse(prevPageResponse, &res)
+                       if err != nil {
+                               return nil, err
+                       }
+                       if !res.Data.HasMore {
+                               return nil, api.ErrFinishCollect
+                       }
+                       return res.Data.PageToken, nil
+               },
+               Query: func(reqData *api.RequestData) (url.Values, 
errors.Error) {
+                       input := reqData.Input.(*ChatInput)
+                       query := url.Values{}
+                       query.Set("container_id_type", "chat")
+                       query.Set("container_id", input.ChatId)
+                       query.Set("page_size", strconv.Itoa(pageSize))
+                       if pageToken, ok := reqData.CustomData.(string); ok && 
pageToken != "" {
+                               query.Set("page_token", 
reqData.CustomData.(string))
+                       }
+                       return query, nil
+               },
+               ResponseParser: func(res *http.Response) ([]json.RawMessage, 
errors.Error) {
+                       body := &apimodels.FeishuImApiResult{}
+                       err := api.UnmarshalResponse(res, body)
+                       if err != nil {
+                               return nil, err
+                       }
+                       return body.Data.Items, nil
+               },
+       })
+       if err != nil {
+               return err
+       }
+
+       return collector.Execute()
+}
+
+var CollectMessageMeta = plugin.SubTaskMeta{
+       Name:             "collectMeesage",
+       EntryPoint:       CollectMessage,
+       EnabledByDefault: true,
+       Description:      "Collect message from Feishu api",
+}
diff --git a/backend/plugins/feishu/tasks/message_extractor.go 
b/backend/plugins/feishu/tasks/message_extractor.go
new file mode 100644
index 000000000..f075b61f1
--- /dev/null
+++ b/backend/plugins/feishu/tasks/message_extractor.go
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+       "encoding/json"
+       "github.com/apache/incubator-devlake/core/errors"
+       "github.com/apache/incubator-devlake/core/plugin"
+       "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+       "github.com/apache/incubator-devlake/plugins/feishu/apimodels"
+       "github.com/apache/incubator-devlake/plugins/feishu/models"
+       "strconv"
+       "time"
+)
+
+var _ plugin.SubTaskEntryPoint = ExtractMessage
+
+func ExtractMessage(taskCtx plugin.SubTaskContext) errors.Error {
+       data := taskCtx.GetData().(*FeishuTaskData)
+       extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
+               RawDataSubTaskArgs: api.RawDataSubTaskArgs{
+                       Ctx: taskCtx,
+                       Params: FeishuApiParams{
+                               ConnectionId: data.Options.ConnectionId,
+                       },
+                       Table: RAW_MESSAGE_TABLE,
+               },
+               Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
+                       body := &apimodels.FeishuMessageResultItem{}
+                       err := errors.Convert(json.Unmarshal(row.Data, body))
+                       if err != nil {
+                               return nil, err
+                       }
+                       message := &models.FeishuMessage{}
+                       message.ConnectionId = data.Options.ConnectionId
+                       message.MessageId = body.MessageId
+                       message.Content = body.Body.Content
+                       message.ChatId = body.ChatId
+                       message.MsgType = body.MsgType
+                       message.ParentId = body.ParentId
+                       message.RootId = body.RootId
+                       message.SenderId = body.Sender.Id
+                       message.SenderIdType = body.Sender.IdType
+                       message.SenderType = body.Sender.SenderType
+                       message.Deleted = body.Deleted
+                       createTimestamp, err := 
errors.Convert01(strconv.Atoi(body.CreateTime))
+                       if err != nil {
+                               return nil, err
+                       }
+                       message.CreateTime = 
time.UnixMilli(int64(createTimestamp))
+                       updateTimestamp, err := 
errors.Convert01(strconv.Atoi(body.UpdateTime))
+                       if err != nil {
+                               return nil, err
+                       }
+                       message.UpdateTime = 
time.UnixMilli(int64(updateTimestamp))
+                       message.Updated = body.Updated
+                       return []interface{}{message}, nil
+               },
+       })
+       if err != nil {
+               return err
+       }
+
+       return extractor.Execute()
+}
+
+var ExtractMessageMeta = plugin.SubTaskMeta{
+       Name:             "extractChatItem",
+       EntryPoint:       ExtractMessage,
+       EnabledByDefault: true,
+       Description:      "Extract raw messages data into tool layer table 
feishu_meeting_top_user_item",
+}
diff --git a/backend/plugins/feishu/tasks/task_data.go 
b/backend/plugins/feishu/tasks/task_data.go
index 79099bc6a..428c06739 100644
--- a/backend/plugins/feishu/tasks/task_data.go
+++ b/backend/plugins/feishu/tasks/task_data.go
@@ -19,7 +19,6 @@ package tasks
 
 import (
        helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
-       "github.com/apache/incubator-devlake/plugins/feishu/models"
 )
 
 type FeishuApiParams struct {
@@ -32,7 +31,6 @@ type FeishuOptions struct {
 }
 
 type FeishuTaskData struct {
-       Options                  *FeishuOptions
-       ApiClient                *helper.ApiAsyncClient
-       FeishuMeetingTopUserItem *models.FeishuMeetingTopUserItem
+       Options   *FeishuOptions
+       ApiClient *helper.ApiAsyncClient
 }

Reply via email to