This is an automated email from the ASF dual-hosted git repository.

zhenyu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 22ac2228cd feat: add more support for pipeline API to Go client (#3104)
22ac2228cd is described below

commit 22ac2228cd1601951710c6d906217db960937b47
Author: wyyolo <[email protected]>
AuthorDate: Sun Aug 25 23:19:20 2024 +0800

    feat: add more support for pipeline API to Go client (#3104)
    
    * feat: add more support for pipeline API to Go client
    
    * fix: not enough arguments in call to d.executeRequest
    
    * fix: not enough arguments in call to d.executeRequest
    
    * fix: not enough arguments in call to d.executeRequest
    
    * feat: add more support for pipeline API to Go client
    
    * feat: add more support for pipeline API to Go client
    
    * feat: add more support for pipeline API to Go client
    
    * feat: add more support for pipeline API to Go client
    
    * feat: add more support for pipeline API to Go client
    
    * feat: add more support for pipeline API to Go client
---
 .../streampipes/data_lake_dashboard_api.go         |   6 +-
 .../streampipes/data_lake_measure_api.go           |  18 +-
 .../streampipes/data_lake_widget_api.go            |   6 +-
 streampipes-client-go/streampipes/endpoint.go      |  39 +--
 streampipes-client-go/streampipes/functions_api.go |   8 +-
 .../internal/serializer/deserializer.go            |  62 +++++
 .../streampipes/internal/serializer/serializer.go  |  37 +++
 streampipes-client-go/streampipes/model/common.go  |  87 ++++++-
 .../streampipes/model/pipeline/pipeline.go         | 163 ++++++++++++
 streampipes-client-go/streampipes/pipeline_api.go  | 274 ++++++++++++++++++++-
 .../streampipes/streampipes_version_api.go         |   2 +-
 streampipes-client-go/streampipes/user_api.go      |   6 +-
 12 files changed, 661 insertions(+), 47 deletions(-)

diff --git a/streampipes-client-go/streampipes/data_lake_dashboard_api.go 
b/streampipes-client-go/streampipes/data_lake_dashboard_api.go
index 5c08c29fdb..728e5f7f74 100644
--- a/streampipes-client-go/streampipes/data_lake_dashboard_api.go
+++ b/streampipes-client-go/streampipes/data_lake_dashboard_api.go
@@ -43,7 +43,7 @@ func (d *DataLakeDashboard) 
GetSingleDataLakeDashboard(dashboardId string) (data
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v3/datalake/dashboard", []string{dashboardId})
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("GET", endPointUrl)
+       response, err := d.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return data_lake.Dashboard{}, err
        }
@@ -73,7 +73,7 @@ func (d *DataLakeDashboard) GetAllDataLakeDashboard() 
([]data_lake.Dashboard, er
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v3/datalake/dashboard", nil)
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("GET", endPointUrl)
+       response, err := d.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return nil, err
        }
@@ -103,7 +103,7 @@ func (d *DataLakeDashboard) 
DeleteSingleDataLakeDashboard(dashboardId string) er
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v3/datalake/dashboard", []string{dashboardId})
        log.Printf("Delete data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("DELETE", endPointUrl)
+       response, err := d.executeRequest("DELETE", endPointUrl, nil)
        if err != nil {
                return err
        }
diff --git a/streampipes-client-go/streampipes/data_lake_measure_api.go 
b/streampipes-client-go/streampipes/data_lake_measure_api.go
index d4719b81b2..012e7d1030 100644
--- a/streampipes-client-go/streampipes/data_lake_measure_api.go
+++ b/streampipes-client-go/streampipes/data_lake_measure_api.go
@@ -43,13 +43,13 @@ func NewDataLakeMeasures(clientConfig 
config.StreamPipesClientConfig) *DataLakeM
        }
 }
 
-// AllDataLakeMeasure retrieves a list of all measurements series from the 
Data Lake.
-func (d *DataLakeMeasure) AllDataLakeMeasure() ([]data_lake.DataLakeMeasure, 
error) {
+// GetAllDataLakeMeasure retrieves a list of all measurements series from the 
Data Lake.
+func (d *DataLakeMeasure) GetAllDataLakeMeasure() 
([]data_lake.DataLakeMeasure, error) {
 
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v4/datalake/measurements", nil)
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("GET", endPointUrl)
+       response, err := d.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return nil, err
        }
@@ -81,7 +81,7 @@ func (d *DataLakeMeasure) DeleteDataLakeMeasurements() error {
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v4/datalake/measurements", nil)
        log.Printf("Delete data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("DELETE", endPointUrl)
+       response, err := d.executeRequest("DELETE", endPointUrl, nil)
        if err != nil {
                return err
        }
@@ -102,7 +102,7 @@ func (d *DataLakeMeasure) 
GetSingleDataLakeMeasure(elementId string) (data_lake.
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v4/datalake/measure", []string{elementId})
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("GET", endPointUrl)
+       response, err := d.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return data_lake.DataLakeMeasure{}, err
        }
@@ -134,7 +134,7 @@ func (d *DataLakeMeasure) 
DeleteSingleDataLakeMeasure(elementId string) error {
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v4/datalake/measure", []string{elementId})
        log.Printf("Delete data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("DELETE", endPointUrl)
+       response, err := d.executeRequest("DELETE", endPointUrl, nil)
        if err != nil {
                return err
        }
@@ -157,7 +157,7 @@ func (d *DataLakeMeasure) GetSingleDataSeries(measureId 
string) (*data_lake.Data
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v4/datalake/measurements", []string{measureId})
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("GET", endPointUrl)
+       response, err := d.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return nil, err
        }
@@ -190,7 +190,7 @@ func (d *DataLakeMeasure) 
ClearDataLakeMeasureData(measureId string) error {
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v4/datalake/measurements", []string{measureId})
        log.Printf("Clear data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("DELETE", endPointUrl)
+       response, err := d.executeRequest("DELETE", endPointUrl, nil)
        if err != nil {
                return err
        }
@@ -212,7 +212,7 @@ func (d *DataLakeMeasure) DeleteDataLakeMeasure(measureId 
string) error {
 
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v4/datalake/measurements", []string{measureId, "drop"})
        log.Printf("Delete data from: %s", endPointUrl)
-       response, err := d.executeRequest("DELETE", endPointUrl)
+       response, err := d.executeRequest("DELETE", endPointUrl, nil)
        if err != nil {
                return err
        }
diff --git a/streampipes-client-go/streampipes/data_lake_widget_api.go 
b/streampipes-client-go/streampipes/data_lake_widget_api.go
index 4988408f33..f010dd77bd 100644
--- a/streampipes-client-go/streampipes/data_lake_widget_api.go
+++ b/streampipes-client-go/streampipes/data_lake_widget_api.go
@@ -44,7 +44,7 @@ func (d *DataLakeWidget) GetSingleDataLakeWidget(widgetId 
string) (data_lake.Dat
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v3/datalake/dashboard/widgets", []string{widgetId})
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("GET", endPointUrl)
+       response, err := d.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return data_lake.DataExplorerWidgetModel{}, err
        }
@@ -75,7 +75,7 @@ func (d *DataLakeWidget) GetAllDataLakeWidget() 
([]data_lake.DataExplorerWidgetM
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v3/datalake/dashboard/widgets", nil)
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("GET", endPointUrl)
+       response, err := d.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return nil, err
        }
@@ -105,7 +105,7 @@ func (d *DataLakeWidget) 
DeleteSingleDataLakeWidget(widgetId string) error {
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v3/datalake/dashboard/widgets", []string{widgetId})
        log.Printf("Delete data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("DELETE", endPointUrl)
+       response, err := d.executeRequest("DELETE", endPointUrl, nil)
        if err != nil {
                return err
        }
diff --git a/streampipes-client-go/streampipes/endpoint.go 
b/streampipes-client-go/streampipes/endpoint.go
index 41384b7e91..5fccd5f3a5 100644
--- a/streampipes-client-go/streampipes/endpoint.go
+++ b/streampipes-client-go/streampipes/endpoint.go
@@ -18,20 +18,25 @@
 package streampipes
 
 import (
-       "errors"
-       "net/http"
-
+       "bytes"
+       "fmt"
        "github.com/apache/streampipes/streampipes-client-go/streampipes/config"
        headers 
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/http_headers"
+       "io"
+       "net/http"
 )
 
 type endpoint struct {
        config config.StreamPipesClientConfig
 }
 
-func (e *endpoint) executeRequest(method string, endPointUrl string) 
(*http.Response, error) {
+func (e *endpoint) executeRequest(method string, endPointUrl string, body 
[]byte) (*http.Response, error) {
 
-       req, err := http.NewRequest(method, endPointUrl, nil)
+       var reader io.Reader
+       if body != nil {
+               reader = bytes.NewReader(body)
+       }
+       req, err := http.NewRequest(method, endPointUrl, reader)
        if err != nil {
                return nil, err
        }
@@ -53,23 +58,23 @@ func (e *endpoint) handleStatusCode(resp *http.Response) 
error {
 
        switch resp.StatusCode {
        case http.StatusUnauthorized:
-               return errors.New("The streamPipes Backend returned an 
unauthorized error.\nplease check your ApiUser and/or Apikey to be correct.")
+               return fmt.Errorf("response code %d:"+"The streamPipes Backend 
returned an unauthorized error.\nplease check your ApiUser and/or Apikey to be 
correct.", resp.StatusCode)
        case http.StatusForbidden:
-               return errors.New("There seems to be an issue with the access 
rights of the given user and the resource you queried.\n" +
-                       "Apparently, this user is not allowed to query the 
resource.\n" +
-                       "Please check the user's permissions or contact your 
StreamPipes admin.")
+               return fmt.Errorf("response code %d:"+"There seems to be an 
issue with the access rights of the given user and the resource you queried.\n"+
+                       "Apparently, this user is not allowed to query the 
resource.\n"+
+                       "Please check the user's permissions or contact your 
StreamPipes admin.", resp.StatusCode)
        case http.StatusNotFound:
-               return errors.New("There seems to be an issue with the Go 
Client calling the API inappropriately.\n" +
-                       "This should not happen, but unfortunately did.\n" +
-                       "If you don't mind, it would be awesome to let us know 
by creating an issue at https://github.com/apache/streampipes.\n";)
+               return fmt.Errorf("response code %d:"+"There seems to be an 
issue with the Go Client calling the API inappropriately.\n"+
+                       "This should not happen, but unfortunately did.\n"+
+                       "If you don't mind, it would be awesome to let us know 
by creating an issue at https://github.com/apache/streampipes.\n";, 
resp.StatusCode)
        case http.StatusMethodNotAllowed:
-               return errors.New("There seems to be an issue with the Go 
Client calling the API inappropriately.\n" +
-                       "This should not happen, but unfortunately did.\n" +
-                       "If you don't mind, it would be awesome to let us know 
by creating an issue at https://github.com/apache/streampipes.\n";)
+               return fmt.Errorf("response code %d:"+"There seems to be an 
issue with the Go Client calling the API inappropriately.\n"+
+                       "This should not happen, but unfortunately did.\n"+
+                       "If you don't mind, it would be awesome to let us know 
by creating an issue at https://github.com/apache/streampipes.\n";, 
resp.StatusCode)
        case http.StatusInternalServerError:
-               return errors.New("streamPipes internal error")
+               return fmt.Errorf("response code %d:"+"streamPipes internal 
error", resp.StatusCode)
        default:
-               return errors.New(resp.Status)
+               return fmt.Errorf(resp.Status)
        }
 
 }
diff --git a/streampipes-client-go/streampipes/functions_api.go 
b/streampipes-client-go/streampipes/functions_api.go
index 3b696753aa..14375f3e1e 100644
--- a/streampipes-client-go/streampipes/functions_api.go
+++ b/streampipes-client-go/streampipes/functions_api.go
@@ -44,7 +44,7 @@ func (f *Functions) GetFunctionLogs(functionId string) 
([]functions.SpLogEntry,
        endPointUrl := util.NewStreamPipesApiPath(f.config.Url, 
"streampipes-backend/api/v2/functions", []string{functionId, "logs"})
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := f.executeRequest("GET", endPointUrl)
+       response, err := f.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return nil, err
        }
@@ -75,7 +75,7 @@ func (f *Functions) GetFunctionMetrics(functionId string) 
(functions.SpMetricsEn
        endPointUrl := util.NewStreamPipesApiPath(f.config.Url, 
"streampipes-backend/api/v2/functions", []string{functionId, "metrics"})
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := f.executeRequest("GET", endPointUrl)
+       response, err := f.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return functions.SpMetricsEntry{}, err
        }
@@ -106,7 +106,7 @@ func (f *Functions) GetAllFunction() 
([]functions.FunctionDefinition, error) {
        endPointUrl := util.NewStreamPipesApiPath(f.config.Url, 
"streampipes-backend/api/v2/functions", nil)
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := f.executeRequest("GET", endPointUrl)
+       response, err := f.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return nil, err
        }
@@ -136,7 +136,7 @@ func (f *Functions) DeleteSingleFunction(functionId string) 
error {
        endPointUrl := util.NewStreamPipesApiPath(f.config.Url, 
"streampipes-backend/api/v2/functions", []string{functionId})
        log.Printf("Delete data from: %s", endPointUrl)
 
-       response, err := f.executeRequest("DELETE", endPointUrl)
+       response, err := f.executeRequest("DELETE", endPointUrl, nil)
        if err != nil {
                return err
        }
diff --git 
a/streampipes-client-go/streampipes/internal/serializer/deserializer.go 
b/streampipes-client-go/streampipes/internal/serializer/deserializer.go
index a6007b9e86..fd9bb40eb3 100644
--- a/streampipes-client-go/streampipes/internal/serializer/deserializer.go
+++ b/streampipes-client-go/streampipes/internal/serializer/deserializer.go
@@ -267,3 +267,65 @@ func (p *UserAccountDeserializer) Unmarshal(data []byte) 
(interface{}, error) {
        return userAccount, nil
 
 }
+
+type PipelineDeserializer struct{}
+
+func NewPipelineDeserializer() *PipelineDeserializer {
+       return &PipelineDeserializer{}
+}
+
+func (p *PipelineDeserializer) Unmarshal(data []byte) (interface{}, error) {
+       var pipeLine pipeline.Pipeline
+       err := json.Unmarshal(data, &pipeLine)
+       if err != nil {
+               return nil, err
+       }
+       return pipeLine, nil
+
+}
+
+type PipelinesDeserializer struct{}
+
+func NewPipelinesDeserializer() *PipelinesDeserializer {
+       return &PipelinesDeserializer{}
+}
+
+func (p *PipelinesDeserializer) Unmarshal(data []byte) (interface{}, error) {
+       var pipelines []pipeline.Pipeline
+       err := json.Unmarshal(data, &pipelines)
+       if err != nil {
+               return nil, err
+       }
+       return pipelines, nil
+
+}
+
+type PipelineStatusMessagesDeserializer struct{}
+
+func NewPipelineStatusMessagesDeserializer() 
*PipelineStatusMessagesDeserializer {
+       return &PipelineStatusMessagesDeserializer{}
+}
+
+func (p *PipelineStatusMessagesDeserializer) Unmarshal(data []byte) 
(interface{}, error) {
+       var pipelineStatusMessage []pipeline.PipelineStatusMessage
+       err := json.Unmarshal(data, &pipelineStatusMessage)
+       if err != nil {
+               return nil, err
+       }
+       return pipelineStatusMessage, nil
+}
+
+type PipelineOperationStatusDeserializer struct{}
+
+func NewPipelineOperationStatusDeserializer() 
*PipelineOperationStatusDeserializer {
+       return &PipelineOperationStatusDeserializer{}
+}
+
+func (p *PipelineOperationStatusDeserializer) Unmarshal(data []byte) 
(interface{}, error) {
+       var pipelineOperationStatus pipeline.PipelineOperationStatus
+       err := json.Unmarshal(data, &pipelineOperationStatus)
+       if err != nil {
+               return nil, err
+       }
+       return pipelineOperationStatus, nil
+}
diff --git 
a/streampipes-client-go/streampipes/internal/serializer/serializer.go 
b/streampipes-client-go/streampipes/internal/serializer/serializer.go
new file mode 100644
index 0000000000..24439a164b
--- /dev/null
+++ b/streampipes-client-go/streampipes/internal/serializer/serializer.go
@@ -0,0 +1,37 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package serializer
+
+import (
+       "encoding/json"
+       
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/pipeline"
+)
+
+type PipelineSerializer struct{}
+
+func NewPipelineSerializer() PipelineSerializer {
+       return PipelineSerializer{}
+}
+
+func (p PipelineSerializer) Marshal(pp pipeline.Pipeline) ([]byte, error) {
+       data, err := json.Marshal(pp)
+       if err != nil {
+               return nil, err
+       }
+       return data, nil
+}
diff --git a/streampipes-client-go/streampipes/model/common.go 
b/streampipes-client-go/streampipes/model/common.go
index 5a853fee55..7d71666411 100644
--- a/streampipes-client-go/streampipes/model/common.go
+++ b/streampipes-client-go/streampipes/model/common.go
@@ -65,12 +65,91 @@ type DataSeries struct {
 
 type ResponseMessage struct {
        Success       bool           `json:"success"`
-       ElementName   interface{}    `json:"elementName"`
+       ElementName   string         `json:"elementName"`
        Notifications []Notification `json:"notifications"`
 }
 
 type Notification struct {
-       Title                 string `json:"title"`
-       Description           string `json:"description"`
-       AdditionalInformation string `json:"additionalInformation"`
+       Title                 string      `json:"title"`
+       Description           interface{} `json:"description"`
+       AdditionalInformation string      `json:"additionalInformation"`
+}
+
+type StaticPropertyType string
+
+const (
+       AnyStaticProperty                        StaticPropertyType = 
"AnyStaticProperty"
+       CodeInputStaticProperty                  StaticPropertyType = 
"CodeInputStaticProperty"
+       CollectionStaticProperty                 StaticPropertyType = 
"CollectionStaticProperty"
+       ColorPickerStaticProperty                StaticPropertyType = 
"ColorPickerStaticProperty"
+       DomainStaticProperty                     StaticPropertyType = 
"DomainStaticProperty"
+       FreeTextStaticProperty                   StaticPropertyType = 
"FreeTextStaticProperty"
+       FileStaticProperty                       StaticPropertyType = 
"FileStaticProperty"
+       MappingPropertyUnary                     StaticPropertyType = 
"MappingPropertyUnary"
+       MappingPropertyNary                      StaticPropertyType = 
"MappingPropertyNary"
+       MatchingStaticProperty                   StaticPropertyType = 
"MatchingStaticProperty"
+       OneOfStaticProperty                      StaticPropertyType = 
"OneOfStaticProperty"
+       RuntimeResolvableAnyStaticProperty       StaticPropertyType = 
"RuntimeResolvableAnyStaticProperty"
+       RuntimeResolvableGroupStaticProperty     StaticPropertyType = 
"RuntimeResolvableGroupStaticProperty"
+       RuntimeResolvableOneOfStaticProperty     StaticPropertyType = 
"RuntimeResolvableOneOfStaticProperty"
+       RuntimeResolvableTreeInputStaticProperty StaticPropertyType = 
"RuntimeResolvableTreeInputStaticProperty"
+       StaticPropertyGroup                      StaticPropertyType = 
"StaticPropertyGroup"
+       StaticPropertyAlternatives               StaticPropertyType = 
"StaticPropertyAlternatives"
+       StaticPropertyAlternative                StaticPropertyType = 
"StaticPropertyAlternative"
+       SecretStaticProperty                     StaticPropertyType = 
"SecretStaticProperty"
+       SlideToggleStaticProperty                StaticPropertyType = 
"SlideToggleStaticProperty"
+)
+
+type StaticProperty struct {
+       Optional           bool               `json:"optional,omitempty"`
+       StaticPropertyType StaticPropertyType `json:"staticPropertyType"`
+       Index              int32              `json:"index"`
+       Label              string             `json:"label"`
+       Description        string             `json:"description"`
+       InternalName       string             `json:"internalName"`
+       Predefined         bool               `json:"predefined"`
+       Class              string             `json:"@class"`
+}
+
+type SpDataStream struct {
+       ElementId              string         `json:"elementId"`
+       Dom                    string         `json:"dom"`
+       ConnectedTo            []string       `json:"connectedTo"`
+       Name                   string         `json:"name"`
+       Description            string         `json:"description"`
+       IconUrl                string         `json:"iconUrl"`
+       AppId                  string         `json:"appId"`
+       IncludesAssets         bool           `json:"includesAssets"`
+       IncludesLocales        bool           `json:"includesLocales"`
+       IncludedAssets         []string       `json:"includedAssets"`
+       IncludedLocales        []string       `json:"includedLocales"`
+       InternallyManaged      bool           `json:"internallyManaged"`
+       EventGrounding         EventGrounding `json:"eventGrounding"`
+       EventSchema            EventSchema    `json:"eventSchema"`
+       Category               []string       `json:"category"`
+       Index                  int32          `json:"index"`
+       CorrespondingAdapterId string         `json:"correspondingAdapterId"`
+       Rev                    string         `json:"_rev"`
+}
+
+type EventGrounding struct {
+       TransportProtocols []TransportProtocol `json:"transportProtocols"`
+       TransportFormats   []TransportFormat   `json:"transportFormats"`
+}
+
+type TransportProtocol struct {
+       ElementId       string          `json:"elementId"`
+       BrokerHostname  string          `json:"brokerHostname"`
+       TopicDefinition TopicDefinition `json:"topicDefinition"`
+       Class           string          `json:"@class,omitempty"`
+       Port            int             `json:"port"`
+}
+
+type TopicDefinition struct {
+       ActualTopicName string `json:"actualTopicName"`
+       Class           string `json:"@class"`
+}
+
+type TransportFormat struct {
+       RdfType []string `json:"rdfType"`
 }
diff --git a/streampipes-client-go/streampipes/model/pipeline/pipeline.go 
b/streampipes-client-go/streampipes/model/pipeline/pipeline.go
new file mode 100644
index 0000000000..0586be659d
--- /dev/null
+++ b/streampipes-client-go/streampipes/model/pipeline/pipeline.go
@@ -0,0 +1,163 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package pipeline
+
+import (
+       "github.com/apache/streampipes/streampipes-client-go/streampipes/model"
+)
+
+type PipelineHealthStatus string
+
+const (
+       OK                 PipelineHealthStatus = "OK"
+       REQUIRES_ATTENTION PipelineHealthStatus = "REQUIRES_ATTENTION"
+       FAILURE            PipelineHealthStatus = "FAILURE"
+)
+
+type Pipeline struct {
+       Sepas                 []DataProcessorInvocation //`json:"sepas"`
+       Streams               []model.SpDataStream      //`json:"streams"`
+       Name                  string                    //`json:"name"`
+       Description           string                    
//`json:"description,omitempty"`
+       Actions               []DataSinkInvocation      //`json:"actions"`
+       Running               bool                      //`json:"running"`
+       RestartOnSystemReboot bool                      
//`json:"restartOnSystemReboot"`
+       Valid                 bool                      //`json:"valid"`
+       StartedAt             int64                     
//`json:"startedAt,omitempty"`
+       CreatedAt             int64                     //`json:"createdAt"`
+       PublicElement         bool                      //`json:"publicElement"`
+       CreatedByUser         string                    //`json:"createdByUser"`
+       PipelineCategories    []string                  
//`json:"pipelineCategories"`
+       PipelineNotifications []string                  
//`json:"pipelineNotifications"`
+       HealthStatus          PipelineHealthStatus      //`json:"healthStatus"`
+       ID                    string                    //`json:"_id,omitempty"`
+       Rev                   string                    
//`json:"_rev,omitempty"`
+}
+
+type DataProcessorInvocation struct {
+       ElementId             string                    `json:"elementId"`
+       Dom                   string                    `json:"dom"`
+       ConnectedTo           []string                  `json:"connectedTo"`
+       Name                  string                    `json:"name"`
+       Description           string                    `json:"description"`
+       IconUrl               string                    `json:"iconUrl"`
+       AppId                 string                    `json:"appId"`
+       IncludesAssets        bool                      `json:"includesAssets"`
+       IncludesLocales       bool                      `json:"includesLocales"`
+       IncludedAssets        []string                  `json:"includedAssets"`
+       IncludedLocales       []string                  `json:"includedLocales"`
+       InternallyManaged     bool                      
`json:"internallyManaged"`
+       Version               int32                     `json:"version"`
+       InputStreams          []model.SpDataStream      `json:"inputStreams"`
+       StaticProperties      []model.StaticProperty    
`json:"staticProperties"`
+       BelongsTo             string                    `json:"belongsTo"`
+       StatusInfoSettings    ElementStatusInfoSettings 
`json:"statusInfoSettings"`
+       SupportedGrounding    model.EventGrounding      
`json:"supportedGrounding"`
+       CorrespondingPipeline string                    
`json:"correspondingPipeline"`
+       CorresponddingUser    string                    
`json:"correspondingUser"`
+       StreamRequirements    []model.SpDataStream      
`json:"streamRequirements"`
+       Configured            bool                      `json:"configured"`
+       Uncompleted           bool                      `json:"uncompleted"`
+       SelectedEndpointUrl   string                    
`json:"selectedEndpointUrl"`
+       OutputStream          model.SpDataStream        `json:"outputStream"`
+       OutputStrategies      []OutputStrategy          
`json:"outputStrategies"`
+       PathName              string                    `json:"pathName"`
+       Category              []string                  `json:"category"`
+       Rev                   string                    `json:"_rev"`
+}
+
+type ElementStatusInfoSettings struct {
+       ElementIdentifier string `json:"elementIdentifier"`
+       KafkaHost         string `json:"kafkaHost"`
+       KafkaPort         int32  `json:"kafkaPort"`
+       ErrorTopic        string `json:"errorTopic"`
+       StatsTopic        string `json:"statsTopic"`
+}
+
+type OutputStrategy struct {
+       Name        string               `json:"name"`
+       RenameRules []PropertyRenameRule `json:"renameRules"`
+       Class       string               `json:"class,omitempty"`
+}
+
+type PropertyRenameRule struct {
+       RuntimeID      string `json:"runtimeId"`
+       NewRuntimeName string `json:"newRuntimeName"`
+}
+
+type DataSinkInvocation struct {
+       ElementId             string                    `json:"elementId"`
+       Dom                   string                    `json:"dom"`
+       ConnectedTo           []string                  `json:"connectedTo"`
+       Name                  string                    `json:"name"`
+       Description           string                    `json:"description"`
+       IconUrl               string                    `json:"iconUrl"`
+       AppId                 string                    `json:"appId"`
+       IncludesAssets        bool                      `json:"includesAssets"`
+       IncludesLocales       bool                      `json:"includesLocales"`
+       IncludedAssets        []string                  `json:"includedAssets"`
+       IncludedLocales       []string                  `json:"includedLocales"`
+       InternallyManaged     bool                      
`json:"internallyManaged"`
+       Version               int32                     `json:"version"`
+       InputStreams          []model.SpDataStream      `json:"inputStreams"`
+       StaticProperties      []model.StaticProperty    
`json:"staticProperties"`
+       BelongsTo             string                    `json:"belongsTo"`
+       StatusInfoSettings    ElementStatusInfoSettings 
`json:"statusInfoSettings"`
+       SupportedGrounding    model.EventGrounding      
`json:"supportedGrounding"`
+       CorrespondingPipeline string                    
`json:"correspondingPipeline"`
+       CorrespondingUser     string                    
`json:"correspondingUser"`
+       StreamRequirements    []model.SpDataStream      
`json:"streamRequirements"`
+       Configured            bool                      `json:"configured"`
+       Uncompleted           bool                      `json:"uncompleted"`
+       SelectedEndpointUrl   string                    
`json:"selectedEndpointUrl"`
+       Category              []string                  `json:"category"`
+       Rev                   string                    `json:"_rev"`
+}
+
+type PipelineElementStatus struct {
+       ElementID       string `json:"elementId"`
+       ElementName     string `json:"elementName"`
+       OptionalMessage string `json:"optionalMessage"`
+       Success         bool   `json:"success"`
+}
+
+type PipelineOperationStatus struct {
+       PipelineId    string                  `json:"pipelineId"`
+       PipelineName  string                  `json:"pipelineName"`
+       Title         string                  `json:"title"`
+       Success       bool                    `json:"success"`
+       ElementStatus []PipelineElementStatus `json:"elementStatus"`
+}
+
+type PipelineStatusMessage struct {
+       PipelineId  string `json:"pipelineId"`
+       Timestamp   int64  `json:"timestamp"`
+       MessageType string `json:"messageType"`
+       Message     string `json:"message"`
+}
+type PipelineElementValidationLevel string
+
+const (
+       ValidationInfo  PipelineElementValidationLevel = "INFO"
+       ValidationError PipelineElementValidationLevel = "ERROR"
+)
+
+type PipelineElementValidationInfo struct {
+       Level   PipelineElementValidationLevel `json:"level"`
+       Message string                         `json:"message"`
+}
diff --git a/streampipes-client-go/streampipes/pipeline_api.go 
b/streampipes-client-go/streampipes/pipeline_api.go
index ba1e49f7fe..e58ccc7096 100644
--- a/streampipes-client-go/streampipes/pipeline_api.go
+++ b/streampipes-client-go/streampipes/pipeline_api.go
@@ -18,10 +18,10 @@
 package streampipes
 
 import (
+       "fmt"
        "io"
        "log"
        "net/http"
-
        "github.com/apache/streampipes/streampipes-client-go/streampipes/config"
        
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/serializer"
        
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/util"
@@ -40,12 +40,280 @@ func NewPipeline(clientConfig 
config.StreamPipesClientConfig) *Pipeline {
        }
 }
 
+// GetSinglePipeline get a specific pipeline with the given id
+func (p *Pipeline) GetSinglePipeline(pipelineId string) (pipeline.Pipeline, 
error) {
+
+       endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelines", []string{pipelineId})
+       log.Printf("Get data from: %s", endPointUrl)
+
+       response, err := p.executeRequest("GET", endPointUrl, nil)
+       if err != nil {
+               return pipeline.Pipeline{}, err
+       }
+
+       if response.StatusCode != http.StatusOK {
+               err = p.handleStatusCode(response)
+               if err != nil {
+                       return pipeline.Pipeline{}, err
+               }
+       }
+
+       body, err := io.ReadAll(response.Body)
+       if err != nil {
+               return pipeline.Pipeline{}, err
+       }
+
+       unmarshalData, err := 
serializer.NewPipelineDeserializer().Unmarshal(body)
+       if err != nil {
+               return pipeline.Pipeline{}, err
+       }
+       pipeLine := unmarshalData.(pipeline.Pipeline)
+
+       return pipeLine, nil
+}
+
+// DeleteSinglePipeline delete a pipeline with a given id
+func (p *Pipeline) DeleteSinglePipeline(pipelineId string) error {
+
+       endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelines", []string{pipelineId})
+       log.Printf("Delete data from: %s", endPointUrl)
+
+       response, err := p.executeRequest("DELETE", endPointUrl, nil)
+       if err != nil {
+               return err
+       }
+
+       if response.StatusCode != http.StatusOK {
+               err = p.handleStatusCode(response)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+// UpdateSinglePipeline update an existing pipeline.
+// If the pipeline cannot be updated successfully, it may be due to the 
incorrect pipeline that was passed in.
+func (p *Pipeline) UpdateSinglePipeline(pp pipeline.Pipeline, pipelineId 
string) (model.ResponseMessage, error) {
+       endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelines", []string{pipelineId})
+       body, err := serializer.NewPipelineSerializer().Marshal(pp)
+       if err != nil {
+               return model.ResponseMessage{}, err
+       }
+       response, err := p.executeRequest("PUT", endPointUrl, body)
+       if err != nil {
+               return model.ResponseMessage{}, err
+       }
+
+       if response.StatusCode != http.StatusOK {
+               err = p.handleStatusCode(response)
+               if err != nil {
+                       return model.ResponseMessage{}, err
+               }
+       }
+       data, err := io.ReadAll(response.Body)
+       if err != nil {
+               return model.ResponseMessage{}, err
+       }
+
+       unmarshalData, err := 
serializer.NewResponseMessageDeserializer().Unmarshal(data)
+       if err != nil {
+               return model.ResponseMessage{}, err
+       }
+       message := unmarshalData.(model.ResponseMessage)
+
+       return message, nil
+}
+
+// GetAllPipeline get all pipelines of the current user
+func (p *Pipeline) GetAllPipeline() ([]pipeline.Pipeline, error) {
+       endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelines", nil)
+
+       response, err := p.executeRequest("GET", endPointUrl, nil)
+       if err != nil {
+               return nil, err
+       }
+
+       if response.StatusCode != http.StatusOK {
+               err = p.handleStatusCode(response)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       body, err := io.ReadAll(response.Body)
+       if err != nil {
+               return nil, err
+       }
+
+       unmarshalData, err := 
serializer.NewPipelinesDeserializer().Unmarshal(body)
+       if err != nil {
+               return nil, err
+       }
+       pipelines := unmarshalData.([]pipeline.Pipeline)
+
+       return pipelines, nil
+}
+
+// CreatePipeline store a new pipeline
+func (p *Pipeline) CreatePipeline(pp pipeline.Pipeline) 
(model.ResponseMessage, error) {
+       endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelines", nil)
+
+       body, err := serializer.NewPipelineSerializer().Marshal(pp)
+       if err != nil {
+               return model.ResponseMessage{}, err
+       }
+       response, err := p.executeRequest("POST", endPointUrl, body)
+       if err != nil {
+               return model.ResponseMessage{}, err
+       }
+       if response.StatusCode != http.StatusOK {
+               err = p.handleStatusCode(response)
+               if err != nil {
+                       return model.ResponseMessage{}, err
+               }
+       }
+       data, err := io.ReadAll(response.Body)
+       if err != nil {
+               return model.ResponseMessage{}, err
+       }
+
+       unmarshalData, err := 
serializer.NewResponseMessageDeserializer().Unmarshal(data)
+       if err != nil {
+               fmt.Println(err, 11)
+               return model.ResponseMessage{}, err
+       }
+       message := unmarshalData.(model.ResponseMessage)
+
+       return message, nil
+}
+
+// StopSinglePipeline stop the pipeline with the given id
+func (p *Pipeline) StopSinglePipeline(pipelineId string) 
(pipeline.PipelineOperationStatus, error) {
+       endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelines", []string{pipelineId, "stop"})
+
+       response, err := p.executeRequest("GET", endPointUrl, nil)
+       if err != nil {
+               return pipeline.PipelineOperationStatus{}, err
+       }
+
+       if response.StatusCode != http.StatusOK {
+               err = p.handleStatusCode(response)
+               if err != nil {
+                       return pipeline.PipelineOperationStatus{}, err
+               }
+       }
+
+       body, err := io.ReadAll(response.Body)
+       if err != nil {
+               return pipeline.PipelineOperationStatus{}, err
+       }
+
+       unmarshalData, err := 
serializer.NewPipelineOperationStatusDeserializer().Unmarshal(body)
+       if err != nil {
+               return pipeline.PipelineOperationStatus{}, err
+       }
+       status := unmarshalData.(pipeline.PipelineOperationStatus)
+
+       return status, nil
+}
+
+// GetSinglePipelineStatus get the pipeline status of a given pipeline
+func (p *Pipeline) GetSinglePipelineStatus(pipelineId string) 
([]pipeline.PipelineStatusMessage, error) {
+       endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelines", []string{pipelineId, "status"})
+
+       response, err := p.executeRequest("GET", endPointUrl, nil)
+       if err != nil {
+               return nil, err
+       }
+
+       if response.StatusCode != http.StatusOK {
+               err = p.handleStatusCode(response)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       body, err := io.ReadAll(response.Body)
+       if err != nil {
+               return nil, err
+       }
+
+       unmarshalData, err := 
serializer.NewPipelineStatusMessagesDeserializer().Unmarshal(body)
+       if err != nil {
+               return nil, err
+       }
+       status := unmarshalData.([]pipeline.PipelineStatusMessage)
+
+       return status, nil
+}
+
+// StartSinglePipeline start the pipeline with the given id
+func (p *Pipeline) StartSinglePipeline(pipelineId string) 
(pipeline.PipelineOperationStatus, error) {
+       endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelines", []string{pipelineId, "start"})
+
+       response, err := p.executeRequest("GET", endPointUrl, nil)
+       if err != nil {
+               return pipeline.PipelineOperationStatus{}, err
+       }
+
+       if response.StatusCode != http.StatusOK {
+               err = p.handleStatusCode(response)
+               if err != nil {
+                       return pipeline.PipelineOperationStatus{}, err
+               }
+       }
+
+       body, err := io.ReadAll(response.Body)
+       if err != nil {
+               return pipeline.PipelineOperationStatus{}, err
+       }
+
+       unmarshalData, err := 
serializer.NewPipelineOperationStatusDeserializer().Unmarshal(body)
+       if err != nil {
+               return pipeline.PipelineOperationStatus{}, err
+       }
+       status := unmarshalData.(pipeline.PipelineOperationStatus)
+
+       return status, nil
+}
+
+// GetContainsElementPipeline returns all pipelines that contain the element 
with the elementld
+func (p *Pipeline) GetContainsElementPipeline(pipelineId string) 
([]pipeline.Pipeline, error) {
+       endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelines/contains", []string{pipelineId})
+
+       response, err := p.executeRequest("GET", endPointUrl, nil)
+       if err != nil {
+               return nil, err
+       }
+
+       if response.StatusCode != http.StatusOK {
+               err = p.handleStatusCode(response)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       body, err := io.ReadAll(response.Body)
+       if err != nil {
+               return nil, err
+       }
+
+       unmarshalData, err := 
serializer.NewPipelinesDeserializer().Unmarshal(body)
+       if err != nil {
+               return nil, err
+       }
+       pipelines := unmarshalData.([]pipeline.Pipeline)
+
+       return pipelines, nil
+}
+
 func (p *Pipeline) GetPipelineCategory() ([]pipeline.PipelineCategory, error) {
 
        endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelinecategories", nil)
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := p.executeRequest("GET", endPointUrl)
+       response, err := p.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return nil, err
        }
@@ -76,7 +344,7 @@ func (p *Pipeline) DeletePipelineCategory(categoryId string) 
(model.ResponseMess
        endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelinecategories", []string{categoryId})
        log.Printf("Delete data from: %s", endPointUrl)
 
-       response, err := p.executeRequest("DELETE", endPointUrl)
+       response, err := p.executeRequest("DELETE", endPointUrl, nil)
        if err != nil {
                return model.ResponseMessage{}, err
        }
diff --git a/streampipes-client-go/streampipes/streampipes_version_api.go 
b/streampipes-client-go/streampipes/streampipes_version_api.go
index 195ecc476d..6c5b924e0e 100644
--- a/streampipes-client-go/streampipes/streampipes_version_api.go
+++ b/streampipes-client-go/streampipes/streampipes_version_api.go
@@ -45,7 +45,7 @@ func (d *Versions) GetStreamPipesVersion() 
(streampipes_version.Versions, error)
        endPointUrl := util.NewStreamPipesApiPath(d.config.Url, 
"streampipes-backend/api/v2/info/versions", nil)
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := d.executeRequest("GET", endPointUrl)
+       response, err := d.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return streampipes_version.Versions{}, err
        }
diff --git a/streampipes-client-go/streampipes/user_api.go 
b/streampipes-client-go/streampipes/user_api.go
index 6499d50b7d..22e8832284 100644
--- a/streampipes-client-go/streampipes/user_api.go
+++ b/streampipes-client-go/streampipes/user_api.go
@@ -44,7 +44,7 @@ func (s *StreamPipesUserInfo) 
GetSingleStreamPipesUserAccountInfo(principalId st
        endPointUrl := util.NewStreamPipesApiPath(s.config.Url, 
"streampipes-backend/api/v2/users", []string{principalId})
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := s.executeRequest("GET", endPointUrl)
+       response, err := s.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return streampipes_user.UserAccount{}, err
        }
@@ -75,7 +75,7 @@ func (s *StreamPipesUserInfo) 
GetAllStreamPipesShortUserInfo() ([]streampipes_us
        endPointUrl := util.NewStreamPipesApiPath(s.config.Url, 
"streampipes-backend/api/v2/users", nil)
        log.Printf("Get data from: %s", endPointUrl)
 
-       response, err := s.executeRequest("GET", endPointUrl)
+       response, err := s.executeRequest("GET", endPointUrl, nil)
        if err != nil {
                return nil, err
        }
@@ -105,7 +105,7 @@ func (s *StreamPipesUserInfo) 
DeleteSingleStreamPipesShortUserInfo(principalId s
        endPointUrl := util.NewStreamPipesApiPath(s.config.Url, 
"streampipes-backend/api/v2/users", []string{principalId})
        log.Printf("Delete data from: %s", endPointUrl)
 
-       response, err := s.executeRequest("DELETE", endPointUrl)
+       response, err := s.executeRequest("DELETE", endPointUrl, nil)
        if err != nil {
                return err
        }


Reply via email to