github-advanced-security[bot] commented on code in PR #316: URL: https://github.com/apache/incubator-hugegraph-computer/pull/316#discussion_r1810040432
########## vermeer/apps/master/services/http_task.go: ########## @@ -0,0 +1,147 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with this +work for additional information regarding copyright ownership. The ASF +licenses this file to You under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations +under the License. +*/ + +package services + +import ( + "fmt" + "net/http" + "strconv" + "time" + "vermeer/apps/common" + "vermeer/apps/structure" + + "github.com/gin-gonic/gin" +) + +type taskInJson struct { + ID int32 `json:"id,omitempty"` + Status structure.TaskStatus `json:"status,omitempty"` + State structure.TaskState `json:"state,omitempty"` + CreateUser string `json:"create_user,omitempty"` + CreateType string `json:"create_type,omitempty"` + CreateTime time.Time `json:"create_time,omitempty"` + StartTime time.Time `json:"start_time,omitempty"` + UpdateTime time.Time `json:"update_time,omitempty"` + GraphName string `json:"graph_name,omitempty"` + SpaceName string `json:"space_name,omitempty"` + Type string `json:"task_type,omitempty"` + Params map[string]string `json:"params,omitempty"` + Workers []taskWorker `json:"workers,omitempty"` + ErrMessage string `json:"error_message,omitempty"` + StatisticsResult map[string]any `json:"statistics_result,omitempty"` +} + +type taskWorker struct { + Name string `json:"name,omitempty"` + State structure.TaskState `json:"state,omitempty"` +} + +func taskInfo2TaskJsons(tasks []*structure.TaskInfo) []taskInJson { + jsons := make([]taskInJson, len(tasks)) + for i, task := range tasks { + jsons[i] = taskInfo2TaskJson(task) + } + return jsons +} + +func taskInfo2TaskJson(task *structure.TaskInfo) taskInJson { + taskInJson := taskInJson{ + ID: task.ID, + Status: task.State.Converter(), + State: task.State, + CreateUser: task.CreateUser, + CreateType: task.CreateType, + StartTime: task.StartTime, + CreateTime: task.CreateTime, + UpdateTime: task.UpdateTime, + GraphName: task.GraphName, + SpaceName: task.SpaceName, + Type: task.Type, + Workers: make([]taskWorker, 0, len(task.Workers)), + ErrMessage: task.ErrMessage, + Params: make(map[string]string, len(task.Workers)), + StatisticsResult: task.StatisticsResult, + } + + for s, s2 := range task.Params { + taskInJson.Params[s] = s2 + } + + for _, worker := range task.Workers { + taskInJson.Workers = append(taskInJson.Workers, taskWorker{ + Name: worker.Name, + State: worker.State, + }) + } + + return taskInJson +} + +type TaskHandler struct { + common.SenHandler +} + +type TaskResp struct { + common.BaseResp + Task taskInJson `json:"task,omitempty"` +} + +func (th *TaskHandler) GET(ctx *gin.Context) { + taskIDString := ctx.Param("task_id") + taskID, err := strconv.Atoi(taskIDString) + if isBad(err != nil, ctx, func() string { return fmt.Sprintf("task_id type convert to int error: %v", err) }) { + return + } + + task, err := taskBiz(ctx).GetTaskInfo(int32(taskID)) Review Comment: ## Incorrect conversion between integer types Incorrect conversion of an integer with architecture-dependent bit size from [strconv.Atoi](1) to a lower bit size type int32 without an upper bound check. [Show more details](https://github.com/apache/incubator-hugegraph-computer/security/code-scanning/20) ########## vermeer/apps/graphio/hugegraph.go: ########## @@ -0,0 +1,740 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with this +work for additional information regarding copyright ownership. The ASF +licenses this file to You under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations +under the License. +*/ + +package graphio + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math" + "net/http" + "strconv" + "strings" + "sync/atomic" + "time" + "vermeer/apps/common" + "vermeer/apps/options" + pd "vermeer/apps/protos/hugegraph-pd-grpc" + "vermeer/apps/protos/hugegraph-pd-grpc/metapb" + hstore "vermeer/apps/protos/hugegraph-store-grpc" + "vermeer/apps/serialize" + "vermeer/apps/structure" + + "google.golang.org/grpc/metadata" + + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func init() { + LoadMakers[LoadTypeHugegraph] = &HugegraphMaker{} +} + +type HugegraphMaker struct{} + +func (a *HugegraphMaker) CreateGraphLoader() GraphLoader { + return &HugegraphLoader{} +} + +func (a *HugegraphMaker) CreateGraphWriter() GraphWriter { + return &HugegraphWriter{} +} + +func (a *HugegraphMaker) MakeTasks(params map[string]string, taskID int32) ([]LoadPartition, error) { + loadParts := make([]LoadPartition, 0) + + //连接pd,获取图的分区列表 + pdIPAddress := options.GetSliceString(params, "load.hg_pd_peers") + graphName := options.GetString(params, "load.hugegraph_name") + pdIPAddr, err := common.FindValidPD(pdIPAddress) + if err != nil { + return nil, err + } + + //正式建立连接,进行查询 + ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel1() + pdConn, err := grpc.Dial(pdIPAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + logrus.Errorf("connect hugegraph-pb err:%v", err) + return nil, err + } + pdAuthority := common.PDAuthority{} + ctx = pdAuthority.SetAuthority(ctx) + var md metadata.MD + pdClient := pd.NewPDClient(pdConn) + partitionsResp, err := pdClient.QueryPartitions(ctx, + &pd.QueryPartitionsRequest{Query: &metapb.PartitionQuery{GraphName: &graphName}}, grpc.Header(&md)) + if err != nil { + logrus.Infof("QueryPartitions err:%v", err) + return nil, err + } + pdAuthority.SetToken(md) + + // 获取用户设置的分区列表,如果为空,则获取所有分区 + userSetPartitions := options.GetSliceInt(params, "load.hg_partitions") + userSetPartitionMap := make(map[uint32]struct{}) + for _, partitionID := range userSetPartitions { + userSetPartitionMap[uint32(partitionID)] = struct{}{} + } + logrus.Debugf("user partition:%v", userSetPartitions) + + partitions := partitionsResp.GetPartitions() + + leaderStore := make(map[uint64]string) + partID := int32(1) + for _, partition := range partitions { + // 用户设置了分区列表,则只加载用户设置的分区 + // 如果设置为空,则加载全部分区 + if len(userSetPartitionMap) != 0 { + if _, ok := userSetPartitionMap[partition.Id]; !ok { + continue + } + } + var leaderStoreID uint64 + ctx = pdAuthority.SetAuthority(ctx) + var md metadata.MD + shardGroup, err := pdClient.GetShardGroup(ctx, &pd.GetShardGroupRequest{GroupId: partition.Id}, grpc.Header(&md)) + if err != nil { + logrus.Errorf("GetShardGroup err:%v", err) + return nil, err + } + pdAuthority.SetToken(md) + shards := shardGroup.GetShardGroup().GetShards() + for _, shard := range shards { + //找到partition的leader store_id + if shard.Role == metapb.ShardRole_Leader { + leaderStoreID = shard.StoreId + break + } + } + //重复leader不再执行获取地址 + if _, ok := leaderStore[leaderStoreID]; !ok { + //获得新的leader的地址并写入map + ctx = pdAuthority.SetAuthority(ctx) + var md metadata.MD + storeResp, err := pdClient.GetStore(ctx, &pd.GetStoreRequest{StoreId: leaderStoreID}, grpc.Header(&md)) + if err != nil { + logrus.Errorf("GetStore %v err:%v", leaderStoreID, err) + return nil, err + } + pdAuthority.SetToken(md) + if storeResp.Store.State != metapb.StoreState_Up { + logrus.Errorf("store:%v state not up:%v", storeResp.GetStore().Address, storeResp.Store.State) + logrus.Errorf("partition id:%v not available", partition.Id) + return nil, fmt.Errorf("store:%v state not up:%v", storeResp.GetStore().Address, storeResp.Store.State) + } + leaderStore[leaderStoreID] = storeResp.GetStore().Address + } + //将一个partition细分为n个load任务(暂不支持) + partitionCount := partition.EndKey - partition.StartKey + var n uint64 = 1 + //parallel := uint64(options.GetInt(params, "load.parallel")) + //if partitionCount > parallel { + // n = parallel + //} + partCount := partitionCount / n + for i := uint64(0); i < n; i++ { + vStart := partition.StartKey + i*partCount + vEnd := vStart + partCount + if vEnd > partition.EndKey || i == n-1 { + vEnd = partition.EndKey + } + + vertexPart := LoadPartition{} + vertexPart.Init(partID, taskID, LoadPartTypeVertex) + vertexPart.Params = make(map[string]string) + vertexPart.Params["graph_name"] = graphName + vertexPart.Params["part_type"] = LoadPartTypeVertex + vertexPart.Params["partition_id"] = strconv.FormatUint(uint64(partition.Id), 10) + vertexPart.Params["store_id"] = strconv.FormatUint(leaderStoreID, 10) + vertexPart.Params["store_address"] = leaderStore[leaderStoreID] + vertexPart.Params["start_key"] = strconv.FormatUint(vStart, 10) + vertexPart.Params["end_key"] = strconv.FormatUint(vEnd, 10) + loadParts = append(loadParts, vertexPart) + partID += 1 + + edgePart := LoadPartition{} + edgePart.Init(partID, taskID, LoadPartTypeEdge) + edgePart.Params = make(map[string]string) + edgePart.Params["graph_name"] = graphName + edgePart.Params["part_type"] = LoadPartTypeEdge + edgePart.Params["partition_id"] = vertexPart.Params["partition_id"] + edgePart.Params["store_id"] = vertexPart.Params["store_id"] + edgePart.Params["store_address"] = leaderStore[leaderStoreID] + edgePart.Params["start_key"] = vertexPart.Params["start_key"] + edgePart.Params["end_key"] = vertexPart.Params["end_key"] + loadParts = append(loadParts, edgePart) + partID += 1 + } + } + for i := range loadParts { + loadParts[i].Params["load.hugegraph_conditions"] = params["load.hugegraph_conditions"] + loadParts[i].Params["load.vertex_property"] = params["load.vertex_property"] + loadParts[i].Params["load.edge_property"] = params["load.edge_property"] + loadParts[i].Params["load.hugegraph_vertex_condition"] = params["load.hugegraph_vertex_condition"] + loadParts[i].Params["load.hugegraph_edge_condition"] = params["load.hugegraph_edge_condition"] + loadParts[i].Params["load.hugestore_batch_timeout"] = params["load.hugestore_batch_timeout"] + loadParts[i].Params["load.hugestore_batchsize"] = params["load.hugestore_batchsize"] + } + err = pdConn.Close() + if err != nil { + logrus.Errorf("hugegraph-pd close err:%v", err) + return nil, err + } + return loadParts, nil +} + +type HugegraphLoader struct { + handler *HStoreHandler + schema structure.PropertySchema + count int + partitionID uint32 + startKey uint32 + endKey uint32 + graphName string + storeAddr string + useProperty bool + useLabel bool +} + +func (hgl *HugegraphLoader) Init(params map[string]string, schema structure.PropertySchema) error { + if options.GetInt(params, "load.use_property") == 1 { + hgl.useProperty = true + } + hgl.useLabel = false + hgl.schema = schema + hgl.graphName = options.GetString(params, "graph_name") + partitionId, err := strconv.ParseUint(options.GetString(params, "partition_id"), 10, 32) + if err != nil { + logrus.Errorf("get uint partition_id err:%v", err) + return err + } + hgl.partitionID = uint32(partitionId) + + storeAddress := options.GetString(params, "store_address") + hgl.storeAddr = storeAddress + startKey, err := strconv.ParseUint(options.GetString(params, "start_key"), 10, 64) + if err != nil { + logrus.Errorf("get uint start_key err:%v", err) + return err + } + hgl.startKey = uint32(startKey) Review Comment: ## Incorrect conversion between integer types Incorrect conversion of an unsigned 64-bit integer from [strconv.ParseUint](1) to a lower bit size type uint32 without an upper bound check. [Show more details](https://github.com/apache/incubator-hugegraph-computer/security/code-scanning/18) ########## vermeer/apps/master/services/http_task.go: ########## @@ -0,0 +1,147 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with this +work for additional information regarding copyright ownership. The ASF +licenses this file to You under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations +under the License. +*/ + +package services + +import ( + "fmt" + "net/http" + "strconv" + "time" + "vermeer/apps/common" + "vermeer/apps/structure" + + "github.com/gin-gonic/gin" +) + +type taskInJson struct { + ID int32 `json:"id,omitempty"` + Status structure.TaskStatus `json:"status,omitempty"` + State structure.TaskState `json:"state,omitempty"` + CreateUser string `json:"create_user,omitempty"` + CreateType string `json:"create_type,omitempty"` + CreateTime time.Time `json:"create_time,omitempty"` + StartTime time.Time `json:"start_time,omitempty"` + UpdateTime time.Time `json:"update_time,omitempty"` + GraphName string `json:"graph_name,omitempty"` + SpaceName string `json:"space_name,omitempty"` + Type string `json:"task_type,omitempty"` + Params map[string]string `json:"params,omitempty"` + Workers []taskWorker `json:"workers,omitempty"` + ErrMessage string `json:"error_message,omitempty"` + StatisticsResult map[string]any `json:"statistics_result,omitempty"` +} + +type taskWorker struct { + Name string `json:"name,omitempty"` + State structure.TaskState `json:"state,omitempty"` +} + +func taskInfo2TaskJsons(tasks []*structure.TaskInfo) []taskInJson { + jsons := make([]taskInJson, len(tasks)) + for i, task := range tasks { + jsons[i] = taskInfo2TaskJson(task) + } + return jsons +} + +func taskInfo2TaskJson(task *structure.TaskInfo) taskInJson { + taskInJson := taskInJson{ + ID: task.ID, + Status: task.State.Converter(), + State: task.State, + CreateUser: task.CreateUser, + CreateType: task.CreateType, + StartTime: task.StartTime, + CreateTime: task.CreateTime, + UpdateTime: task.UpdateTime, + GraphName: task.GraphName, + SpaceName: task.SpaceName, + Type: task.Type, + Workers: make([]taskWorker, 0, len(task.Workers)), + ErrMessage: task.ErrMessage, + Params: make(map[string]string, len(task.Workers)), + StatisticsResult: task.StatisticsResult, + } + + for s, s2 := range task.Params { + taskInJson.Params[s] = s2 + } + + for _, worker := range task.Workers { + taskInJson.Workers = append(taskInJson.Workers, taskWorker{ + Name: worker.Name, + State: worker.State, + }) + } + + return taskInJson +} + +type TaskHandler struct { + common.SenHandler +} + +type TaskResp struct { + common.BaseResp + Task taskInJson `json:"task,omitempty"` +} + +func (th *TaskHandler) GET(ctx *gin.Context) { + taskIDString := ctx.Param("task_id") + taskID, err := strconv.Atoi(taskIDString) + if isBad(err != nil, ctx, func() string { return fmt.Sprintf("task_id type convert to int error: %v", err) }) { + return + } + + task, err := taskBiz(ctx).GetTaskInfo(int32(taskID)) + if isErr(err, ctx) { + return + } + filteredTask := taskBiz(ctx).FilteringTask(task) + ctx.JSON(http.StatusOK, TaskResp{Task: taskInfo2TaskJson(filteredTask)}) +} + +func (th *TaskHandler) POST(ctx *gin.Context) { + ctx.JSON(http.StatusOK, gin.H{ + "code": http.StatusOK, + }) +} + +type TaskCancelHandler struct { + common.SenHandler +} + +type TaskCancelResp struct { + common.BaseResp +} + +func (tch *TaskCancelHandler) GET(ctx *gin.Context) { + taskIDString := ctx.Param("task_id") + taskID, err := strconv.Atoi(taskIDString) + if isBad(err != nil, ctx, func() string { return fmt.Sprintf("task_id:%s type convert to int error: %v", taskIDString, err) }) { + return + } + + //任务调度器取消任务 + err = taskBiz(ctx).CancelTask(int32(taskID)) Review Comment: ## Incorrect conversion between integer types Incorrect conversion of an integer with architecture-dependent bit size from [strconv.Atoi](1) to a lower bit size type int32 without an upper bound check. [Show more details](https://github.com/apache/incubator-hugegraph-computer/security/code-scanning/21) ########## vermeer/apps/master/threshold/store.go: ########## @@ -0,0 +1,226 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with this +work for additional information regarding copyright ownership. The ASF +licenses this file to You under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations +under the License. +*/ + +package threshold + +import ( + "encoding/json" + "fmt" + "path" + "reflect" + "strconv" + "strings" + "sync" + "vermeer/apps/common" + "vermeer/apps/storage" + + "github.com/sirupsen/logrus" +) + +var store storage.Store +var storeType storage.StoreType = storage.StoreTypePebble +var storeLock sync.Mutex + +// assure must to done without any error +type assure func() + +func initStore() { + p, err := common.GetCurrentPath() + + if err != nil { + panic(fmt.Errorf("failed to get current path error: %w", err)) + } + + dir := path.Join(p, "vermeer_data", "threshold_info") + + store, err = storage.StoreMaker(storage.StoreOption{ + StoreName: storage.StoreTypePebble, + Path: dir, + Fsync: true, + }) + if err != nil { + panic(fmt.Errorf("failed to initialize the kv store, caused by: %w. maybe another vermeer master is running", err)) + } +} + +func makeData(prefix string, key string, value any) (data map[string]any) { + data = make(map[string]any, 0) + putData(data, prefix, key, value) + return +} + +func putData(data map[string]any, prefix string, key string, value any) { + data[strings.Join([]string{prefix, key}, " ")] = value +} + +func batchSave(data map[string]any, process []assure) error { + if err := atomProcess(func() error { return saveObjects(data) }, process); err != nil { + return err + } + + return nil +} + +func atomProcess(atom func() error, process []assure) (err error) { + if err = atom(); err == nil { + for _, p := range process { + p() + } + } + return err +} + +func saveObjects(kv map[string]any) error { + storeLock.Lock() + defer storeLock.Unlock() + + batch := store.NewBatch() + + for k, v := range kv { + var bytes []byte + var err error + + if bytes, err = convertToBytes(v); err != nil { + bytes, err = json.Marshal(v) + } + + if err != nil { + return fmt.Errorf("json marshal '%s' error: %w", reflect.TypeOf(v), err) + } + + if err = batch.Set([]byte(k), bytes); err != nil { + return err + } + } + + err := batch.Commit() + + if err != nil { + return fmt.Errorf("store batch set error: %w", err) + } + + return nil +} + +func retrieveData(prefix string, valueType reflect.Kind) map[string]any { + storeLock.Lock() + defer storeLock.Unlock() + + dataMap := make(map[string]any) + dataKeys := make([]string, 0) + + for kv := range store.Scan() { + if strings.HasPrefix(string(kv.Key), prefix) { + dataKeys = append(dataKeys, string(kv.Key)) + continue + } + } + + for _, s := range dataKeys { + if buf, err := store.Get([]byte(s)); err == nil { + v := string(buf) + + if v == "" { + logrus.Warnf("data restoration was aborted due to an empty value, key: %s", s) + continue + } + + keys := strings.Split(s, " ") + keylen := len(keys) + + if keylen < 2 { + logrus.Errorf("data restoration was aborted due to an illegal length of keys, length: %d, keys: %s", keylen, keys) + continue + } + + dataMap[keys[1]], err = convertToType(v, valueType) + + if err != nil { + logrus.Errorf("failed to convert data value type to %v: %v", valueType, err) + continue + } + + logrus.Infof("retrieved a data entry from store, keys: %s, value: %s", keys, v) + } + + } + + return dataMap +} + +// onvertToBytes Conversion function, supports all primitive types. +func convertToBytes(value interface{}) ([]byte, error) { + switch v := value.(type) { + case int, int8, int16, int32, int64: + return []byte(fmt.Sprintf("%v", v)), nil + case uint, uint8, uint16, uint32, uint64: + return []byte(fmt.Sprintf("%v", v)), nil + case float32: + return []byte(fmt.Sprintf("%f", v)), nil + case float64: + return []byte(fmt.Sprintf("%f", v)), nil + case bool: + return []byte(fmt.Sprintf("%v", v)), nil + case string: + return []byte(v), nil + default: + return nil, fmt.Errorf("unsupported value type: %s", reflect.TypeOf(value).String()) + } +} + +// convertToType Conversion function, supports all primitive types. +func convertToType(value string, targetType reflect.Kind) (interface{}, error) { + switch targetType { + case reflect.Int: + return strconv.Atoi(value) + case reflect.Int8: + intValue, err := strconv.ParseInt(value, 10, 8) + return int8(intValue), err + case reflect.Int16: + intValue, err := strconv.ParseInt(value, 10, 16) + return int16(intValue), err + case reflect.Int32: + intValue, err := strconv.ParseInt(value, 10, 32) + return int32(intValue), err + case reflect.Int64: + return strconv.ParseInt(value, 10, 64) + case reflect.Uint: + uintValue, err := strconv.ParseUint(value, 10, 64) + return uint(uintValue), err Review Comment: ## Incorrect conversion between integer types Incorrect conversion of an unsigned 64-bit integer from [strconv.ParseUint](1) to a lower bit size type uint without an upper bound check. [Show more details](https://github.com/apache/incubator-hugegraph-computer/security/code-scanning/23) ########## vermeer/apps/graphio/hugegraph.go: ########## @@ -0,0 +1,740 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with this +work for additional information regarding copyright ownership. The ASF +licenses this file to You under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations +under the License. +*/ + +package graphio + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math" + "net/http" + "strconv" + "strings" + "sync/atomic" + "time" + "vermeer/apps/common" + "vermeer/apps/options" + pd "vermeer/apps/protos/hugegraph-pd-grpc" + "vermeer/apps/protos/hugegraph-pd-grpc/metapb" + hstore "vermeer/apps/protos/hugegraph-store-grpc" + "vermeer/apps/serialize" + "vermeer/apps/structure" + + "google.golang.org/grpc/metadata" + + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func init() { + LoadMakers[LoadTypeHugegraph] = &HugegraphMaker{} +} + +type HugegraphMaker struct{} + +func (a *HugegraphMaker) CreateGraphLoader() GraphLoader { + return &HugegraphLoader{} +} + +func (a *HugegraphMaker) CreateGraphWriter() GraphWriter { + return &HugegraphWriter{} +} + +func (a *HugegraphMaker) MakeTasks(params map[string]string, taskID int32) ([]LoadPartition, error) { + loadParts := make([]LoadPartition, 0) + + //连接pd,获取图的分区列表 + pdIPAddress := options.GetSliceString(params, "load.hg_pd_peers") + graphName := options.GetString(params, "load.hugegraph_name") + pdIPAddr, err := common.FindValidPD(pdIPAddress) + if err != nil { + return nil, err + } + + //正式建立连接,进行查询 + ctx, cancel1 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel1() + pdConn, err := grpc.Dial(pdIPAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + logrus.Errorf("connect hugegraph-pb err:%v", err) + return nil, err + } + pdAuthority := common.PDAuthority{} + ctx = pdAuthority.SetAuthority(ctx) + var md metadata.MD + pdClient := pd.NewPDClient(pdConn) + partitionsResp, err := pdClient.QueryPartitions(ctx, + &pd.QueryPartitionsRequest{Query: &metapb.PartitionQuery{GraphName: &graphName}}, grpc.Header(&md)) + if err != nil { + logrus.Infof("QueryPartitions err:%v", err) + return nil, err + } + pdAuthority.SetToken(md) + + // 获取用户设置的分区列表,如果为空,则获取所有分区 + userSetPartitions := options.GetSliceInt(params, "load.hg_partitions") + userSetPartitionMap := make(map[uint32]struct{}) + for _, partitionID := range userSetPartitions { + userSetPartitionMap[uint32(partitionID)] = struct{}{} + } + logrus.Debugf("user partition:%v", userSetPartitions) + + partitions := partitionsResp.GetPartitions() + + leaderStore := make(map[uint64]string) + partID := int32(1) + for _, partition := range partitions { + // 用户设置了分区列表,则只加载用户设置的分区 + // 如果设置为空,则加载全部分区 + if len(userSetPartitionMap) != 0 { + if _, ok := userSetPartitionMap[partition.Id]; !ok { + continue + } + } + var leaderStoreID uint64 + ctx = pdAuthority.SetAuthority(ctx) + var md metadata.MD + shardGroup, err := pdClient.GetShardGroup(ctx, &pd.GetShardGroupRequest{GroupId: partition.Id}, grpc.Header(&md)) + if err != nil { + logrus.Errorf("GetShardGroup err:%v", err) + return nil, err + } + pdAuthority.SetToken(md) + shards := shardGroup.GetShardGroup().GetShards() + for _, shard := range shards { + //找到partition的leader store_id + if shard.Role == metapb.ShardRole_Leader { + leaderStoreID = shard.StoreId + break + } + } + //重复leader不再执行获取地址 + if _, ok := leaderStore[leaderStoreID]; !ok { + //获得新的leader的地址并写入map + ctx = pdAuthority.SetAuthority(ctx) + var md metadata.MD + storeResp, err := pdClient.GetStore(ctx, &pd.GetStoreRequest{StoreId: leaderStoreID}, grpc.Header(&md)) + if err != nil { + logrus.Errorf("GetStore %v err:%v", leaderStoreID, err) + return nil, err + } + pdAuthority.SetToken(md) + if storeResp.Store.State != metapb.StoreState_Up { + logrus.Errorf("store:%v state not up:%v", storeResp.GetStore().Address, storeResp.Store.State) + logrus.Errorf("partition id:%v not available", partition.Id) + return nil, fmt.Errorf("store:%v state not up:%v", storeResp.GetStore().Address, storeResp.Store.State) + } + leaderStore[leaderStoreID] = storeResp.GetStore().Address + } + //将一个partition细分为n个load任务(暂不支持) + partitionCount := partition.EndKey - partition.StartKey + var n uint64 = 1 + //parallel := uint64(options.GetInt(params, "load.parallel")) + //if partitionCount > parallel { + // n = parallel + //} + partCount := partitionCount / n + for i := uint64(0); i < n; i++ { + vStart := partition.StartKey + i*partCount + vEnd := vStart + partCount + if vEnd > partition.EndKey || i == n-1 { + vEnd = partition.EndKey + } + + vertexPart := LoadPartition{} + vertexPart.Init(partID, taskID, LoadPartTypeVertex) + vertexPart.Params = make(map[string]string) + vertexPart.Params["graph_name"] = graphName + vertexPart.Params["part_type"] = LoadPartTypeVertex + vertexPart.Params["partition_id"] = strconv.FormatUint(uint64(partition.Id), 10) + vertexPart.Params["store_id"] = strconv.FormatUint(leaderStoreID, 10) + vertexPart.Params["store_address"] = leaderStore[leaderStoreID] + vertexPart.Params["start_key"] = strconv.FormatUint(vStart, 10) + vertexPart.Params["end_key"] = strconv.FormatUint(vEnd, 10) + loadParts = append(loadParts, vertexPart) + partID += 1 + + edgePart := LoadPartition{} + edgePart.Init(partID, taskID, LoadPartTypeEdge) + edgePart.Params = make(map[string]string) + edgePart.Params["graph_name"] = graphName + edgePart.Params["part_type"] = LoadPartTypeEdge + edgePart.Params["partition_id"] = vertexPart.Params["partition_id"] + edgePart.Params["store_id"] = vertexPart.Params["store_id"] + edgePart.Params["store_address"] = leaderStore[leaderStoreID] + edgePart.Params["start_key"] = vertexPart.Params["start_key"] + edgePart.Params["end_key"] = vertexPart.Params["end_key"] + loadParts = append(loadParts, edgePart) + partID += 1 + } + } + for i := range loadParts { + loadParts[i].Params["load.hugegraph_conditions"] = params["load.hugegraph_conditions"] + loadParts[i].Params["load.vertex_property"] = params["load.vertex_property"] + loadParts[i].Params["load.edge_property"] = params["load.edge_property"] + loadParts[i].Params["load.hugegraph_vertex_condition"] = params["load.hugegraph_vertex_condition"] + loadParts[i].Params["load.hugegraph_edge_condition"] = params["load.hugegraph_edge_condition"] + loadParts[i].Params["load.hugestore_batch_timeout"] = params["load.hugestore_batch_timeout"] + loadParts[i].Params["load.hugestore_batchsize"] = params["load.hugestore_batchsize"] + } + err = pdConn.Close() + if err != nil { + logrus.Errorf("hugegraph-pd close err:%v", err) + return nil, err + } + return loadParts, nil +} + +type HugegraphLoader struct { + handler *HStoreHandler + schema structure.PropertySchema + count int + partitionID uint32 + startKey uint32 + endKey uint32 + graphName string + storeAddr string + useProperty bool + useLabel bool +} + +func (hgl *HugegraphLoader) Init(params map[string]string, schema structure.PropertySchema) error { + if options.GetInt(params, "load.use_property") == 1 { + hgl.useProperty = true + } + hgl.useLabel = false + hgl.schema = schema + hgl.graphName = options.GetString(params, "graph_name") + partitionId, err := strconv.ParseUint(options.GetString(params, "partition_id"), 10, 32) + if err != nil { + logrus.Errorf("get uint partition_id err:%v", err) + return err + } + hgl.partitionID = uint32(partitionId) + + storeAddress := options.GetString(params, "store_address") + hgl.storeAddr = storeAddress + startKey, err := strconv.ParseUint(options.GetString(params, "start_key"), 10, 64) + if err != nil { + logrus.Errorf("get uint start_key err:%v", err) + return err + } + hgl.startKey = uint32(startKey) + + endKey, err := strconv.ParseUint(options.GetString(params, "end_key"), 10, 64) + if err != nil { + logrus.Errorf("get uint end_key err:%v", err) + return err + } + hgl.endKey = uint32(endKey) Review Comment: ## Incorrect conversion between integer types Incorrect conversion of an unsigned 64-bit integer from [strconv.ParseUint](1) to a lower bit size type uint32 without an upper bound check. [Show more details](https://github.com/apache/incubator-hugegraph-computer/security/code-scanning/19) ########## vermeer/apps/master/services/http_tasks.go: ########## @@ -0,0 +1,233 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with this +work for additional information regarding copyright ownership. The ASF +licenses this file to You under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations +under the License. +*/ + +package services + +import ( + "fmt" + "io" + "net/http" + "strconv" + "vermeer/apps/common" + "vermeer/apps/compute" + "vermeer/apps/master/bl" + . "vermeer/apps/master/bl" + "vermeer/apps/structure" + + "github.com/gin-gonic/gin" +) + +type TasksHandler struct { + common.SenHandler +} + +type TasksResp struct { + common.BaseResp + Tasks []taskInJson `json:"tasks,omitempty"` +} + +func (th *TasksHandler) GET(ctx *gin.Context) { + queryType := ctx.DefaultQuery("type", "all") + limit := ctx.DefaultQuery("limit", "100") + limitNum, err := strconv.Atoi(limit) + if err != nil { + ctx.JSON(http.StatusBadRequest, TasksResp{BaseResp: common.BaseResp{ErrCode: -1, Message: fmt.Errorf("limit convert to int error:%w", err).Error()}}) + } + tasks, err := taskBiz(ctx).QueryTasks(queryType, limitNum) + if isBad(err != nil, ctx, func() string { return err.Error() }) { + return + } + filteredTasks := taskBiz(ctx).FilteringTasks(tasks) + ctx.JSON(http.StatusOK, TasksResp{Tasks: taskInfo2TaskJsons(filteredTasks)}) +} + +func (th *TasksHandler) POST(ctx *gin.Context) { + ctx.JSON(http.StatusOK, gin.H{ + "code": http.StatusOK, + }) +} + +type TaskCreateHandler struct { + common.SenHandler +} + +type TaskCreateRequest struct { + TaskType string `json:"task_type"` + GraphName string `json:"graph"` + Params map[string]string `json:"params"` +} + +type TaskCreateResponse struct { + common.BaseResp + Task taskInJson `json:"task,omitempty"` +} + +// POST Create a task and execute it in queue order. +func (th *TaskCreateHandler) POST(ctx *gin.Context) { + handleTaskCreation(ctx, QueueExecuteTask) +} + +type TaskCreateSyncHandler struct { + common.SenHandler +} + +// POST Create a task, execute it immediately, and wait until it's done. +func (th *TaskCreateSyncHandler) POST(ctx *gin.Context) { + handleTaskCreation(ctx, SyncExecuteTask) +} + +func handleTaskCreation(ctx *gin.Context, exeFunc func(*structure.TaskInfo) error) { + req := TaskCreateRequest{} + err := ctx.BindJSON(&req) + if isBad(err != nil, ctx, func() string { return fmt.Sprintf("request body not correct: %s", err) }) { + return + } + + task, err := taskBiz(ctx).CreateTaskInfo(req.GraphName, req.TaskType, req.Params, bl.IsCheck) + + if err != ErrorTaskDuplicate { + if isBad(err != nil, ctx, func() string { return err.Error() }) { + common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(req.TaskType).Dec() + return + } + err = exeFunc(task) + if isBad(err != nil, ctx, func() string { return err.Error() }) { + common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(req.TaskType).Dec() + return + } + } + + filteredTask := taskBiz(ctx).FilteringTask(task) + ctx.JSON(http.StatusOK, TaskResp{Task: taskInfo2TaskJson(filteredTask)}) +} + +type TaskCreateBatchHandler struct { + common.SenHandler +} + +type TaskCreateBatchRequest []TaskCreateRequest + +type TaskCreateBatchResponse []TaskCreateResponse + +// POST Create batch tasks and execute it in queue order. +func (th *TaskCreateBatchHandler) POST(ctx *gin.Context) { + // todo: 批量任务创建 + + reqs := TaskCreateBatchRequest{} + err := ctx.BindJSON(&reqs) + if isBad(err != nil, ctx, func() string { return fmt.Sprintf("request body not correct: %s", err) }) { + return + } + resps := TaskCreateBatchResponse{} + tasks := make([]*structure.TaskInfo, 0, len((reqs))) + for _, req := range reqs { + task, err := taskBiz(ctx).CreateTaskInfo(req.GraphName, req.TaskType, req.Params, bl.NoCheck) + if err != ErrorTaskDuplicate && err != nil { + // handle error, ignore duplicate error + resps = append(resps, TaskCreateResponse{ + BaseResp: common.BaseResp{ + ErrCode: 1, + Message: err.Error(), + }, + }) + continue + } + resps = append(resps, TaskCreateResponse{}) + tasks = append(tasks, task) + } + errs := QueueExecuteTasks(tasks) + index := 0 + for i, resp := range resps { + if resp.BaseResp.ErrCode == 0 { + if errs[i] != nil { + resps[i].BaseResp.ErrCode = 1 + resps[i].BaseResp.Message = errs[i].Error() + } else { + filteredTask := taskBiz(ctx).FilteringTask(tasks[index]) + resps[i].Task = taskInfo2TaskJson(filteredTask) + } + index++ + } + } + ctx.JSON(http.StatusOK, resps) +} + +/* + ComputeValueHandler +*/ + +type ComputeValueHandler struct { + common.SenHandler +} + +type ComputeValueResponse struct { + common.BaseResp + Vertices []compute.VertexValue `json:"vertices,omitempty"` + Cursor int32 `json:"cursor,omitempty"` +} + +func (ch *ComputeValueHandler) GET(ctx *gin.Context) { + resp := ComputeValueResponse{} + var err error + limitQuery := ctx.Query("limit") + limit := 1000 + if limitQuery != "" { + limit, err = strconv.Atoi(limitQuery) + //default limit=1000 + if err != nil { + resp.BaseResp.Message = fmt.Sprintf("limit value not correct:%v , set to default value:10000", limit) + } + if limit > 100000 || limit <= 0 { + resp.BaseResp.Message = fmt.Sprintf("limit:%v, must be > 0 and <= 100,000, set to default value: 1000", limit) + limit = 1000 + } + } + + cursorQuery := ctx.Query("cursor") + var cursor int + if cursorQuery == "" { + cursor = 0 + } else { + cursor, err = strconv.Atoi(cursorQuery) + if isBad(err != nil, ctx, func() string { return fmt.Sprintf("query cursor not correct: %s", err) }) { + return + } + } + + taskIDString := ctx.Param("task_id") + if isBad(taskIDString == "", ctx, func() string { return fmt.Sprintf("Task_id must be available:%s", taskIDString) }) { + return + } + + taskID, err := strconv.Atoi(taskIDString) + if isBad(err != nil, ctx, func() string { return fmt.Sprintf("task_id not correct: %s", err) }) { + return + } + + //执行分页查询 + resp.Vertices, resp.Cursor, err = taskBiz(ctx).QueryResults(int32(taskID), cursor, limit) Review Comment: ## Incorrect conversion between integer types Incorrect conversion of an integer with architecture-dependent bit size from [strconv.Atoi](1) to a lower bit size type int32 without an upper bound check. [Show more details](https://github.com/apache/incubator-hugegraph-computer/security/code-scanning/22) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
