This is an automated email from the ASF dual-hosted git repository.
zhenyu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 22ac2228cd feat: add more support for pipeline API to Go client (#3104)
22ac2228cd is described below
commit 22ac2228cd1601951710c6d906217db960937b47
Author: wyyolo <[email protected]>
AuthorDate: Sun Aug 25 23:19:20 2024 +0800
feat: add more support for pipeline API to Go client (#3104)
* feat: add more support for pipeline API to Go client
* fix: not enough arguments in call to d.executeRequest
* fix: not enough arguments in call to d.executeRequest
* fix: not enough arguments in call to d.executeRequest
* feat: add more support for pipeline API to Go client
* feat: add more support for pipeline API to Go client
* feat: add more support for pipeline API to Go client
* feat: add more support for pipeline API to Go client
* feat: add more support for pipeline API to Go client
* feat: add more support for pipeline API to Go client
---
.../streampipes/data_lake_dashboard_api.go | 6 +-
.../streampipes/data_lake_measure_api.go | 18 +-
.../streampipes/data_lake_widget_api.go | 6 +-
streampipes-client-go/streampipes/endpoint.go | 39 +--
streampipes-client-go/streampipes/functions_api.go | 8 +-
.../internal/serializer/deserializer.go | 62 +++++
.../streampipes/internal/serializer/serializer.go | 37 +++
streampipes-client-go/streampipes/model/common.go | 87 ++++++-
.../streampipes/model/pipeline/pipeline.go | 163 ++++++++++++
streampipes-client-go/streampipes/pipeline_api.go | 274 ++++++++++++++++++++-
.../streampipes/streampipes_version_api.go | 2 +-
streampipes-client-go/streampipes/user_api.go | 6 +-
12 files changed, 661 insertions(+), 47 deletions(-)
diff --git a/streampipes-client-go/streampipes/data_lake_dashboard_api.go
b/streampipes-client-go/streampipes/data_lake_dashboard_api.go
index 5c08c29fdb..728e5f7f74 100644
--- a/streampipes-client-go/streampipes/data_lake_dashboard_api.go
+++ b/streampipes-client-go/streampipes/data_lake_dashboard_api.go
@@ -43,7 +43,7 @@ func (d *DataLakeDashboard)
GetSingleDataLakeDashboard(dashboardId string) (data
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v3/datalake/dashboard", []string{dashboardId})
log.Printf("Get data from: %s", endPointUrl)
- response, err := d.executeRequest("GET", endPointUrl)
+ response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return data_lake.Dashboard{}, err
}
@@ -73,7 +73,7 @@ func (d *DataLakeDashboard) GetAllDataLakeDashboard()
([]data_lake.Dashboard, er
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v3/datalake/dashboard", nil)
log.Printf("Get data from: %s", endPointUrl)
- response, err := d.executeRequest("GET", endPointUrl)
+ response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
@@ -103,7 +103,7 @@ func (d *DataLakeDashboard)
DeleteSingleDataLakeDashboard(dashboardId string) er
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v3/datalake/dashboard", []string{dashboardId})
log.Printf("Delete data from: %s", endPointUrl)
- response, err := d.executeRequest("DELETE", endPointUrl)
+ response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
diff --git a/streampipes-client-go/streampipes/data_lake_measure_api.go
b/streampipes-client-go/streampipes/data_lake_measure_api.go
index d4719b81b2..012e7d1030 100644
--- a/streampipes-client-go/streampipes/data_lake_measure_api.go
+++ b/streampipes-client-go/streampipes/data_lake_measure_api.go
@@ -43,13 +43,13 @@ func NewDataLakeMeasures(clientConfig
config.StreamPipesClientConfig) *DataLakeM
}
}
-// AllDataLakeMeasure retrieves a list of all measurements series from the
Data Lake.
-func (d *DataLakeMeasure) AllDataLakeMeasure() ([]data_lake.DataLakeMeasure,
error) {
+// GetAllDataLakeMeasure retrieves a list of all measurements series from the
Data Lake.
+func (d *DataLakeMeasure) GetAllDataLakeMeasure()
([]data_lake.DataLakeMeasure, error) {
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v4/datalake/measurements", nil)
log.Printf("Get data from: %s", endPointUrl)
- response, err := d.executeRequest("GET", endPointUrl)
+ response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
@@ -81,7 +81,7 @@ func (d *DataLakeMeasure) DeleteDataLakeMeasurements() error {
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v4/datalake/measurements", nil)
log.Printf("Delete data from: %s", endPointUrl)
- response, err := d.executeRequest("DELETE", endPointUrl)
+ response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
@@ -102,7 +102,7 @@ func (d *DataLakeMeasure)
GetSingleDataLakeMeasure(elementId string) (data_lake.
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v4/datalake/measure", []string{elementId})
log.Printf("Get data from: %s", endPointUrl)
- response, err := d.executeRequest("GET", endPointUrl)
+ response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return data_lake.DataLakeMeasure{}, err
}
@@ -134,7 +134,7 @@ func (d *DataLakeMeasure)
DeleteSingleDataLakeMeasure(elementId string) error {
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v4/datalake/measure", []string{elementId})
log.Printf("Delete data from: %s", endPointUrl)
- response, err := d.executeRequest("DELETE", endPointUrl)
+ response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
@@ -157,7 +157,7 @@ func (d *DataLakeMeasure) GetSingleDataSeries(measureId
string) (*data_lake.Data
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v4/datalake/measurements", []string{measureId})
log.Printf("Get data from: %s", endPointUrl)
- response, err := d.executeRequest("GET", endPointUrl)
+ response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
@@ -190,7 +190,7 @@ func (d *DataLakeMeasure)
ClearDataLakeMeasureData(measureId string) error {
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v4/datalake/measurements", []string{measureId})
log.Printf("Clear data from: %s", endPointUrl)
- response, err := d.executeRequest("DELETE", endPointUrl)
+ response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
@@ -212,7 +212,7 @@ func (d *DataLakeMeasure) DeleteDataLakeMeasure(measureId
string) error {
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v4/datalake/measurements", []string{measureId, "drop"})
log.Printf("Delete data from: %s", endPointUrl)
- response, err := d.executeRequest("DELETE", endPointUrl)
+ response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
diff --git a/streampipes-client-go/streampipes/data_lake_widget_api.go
b/streampipes-client-go/streampipes/data_lake_widget_api.go
index 4988408f33..f010dd77bd 100644
--- a/streampipes-client-go/streampipes/data_lake_widget_api.go
+++ b/streampipes-client-go/streampipes/data_lake_widget_api.go
@@ -44,7 +44,7 @@ func (d *DataLakeWidget) GetSingleDataLakeWidget(widgetId
string) (data_lake.Dat
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v3/datalake/dashboard/widgets", []string{widgetId})
log.Printf("Get data from: %s", endPointUrl)
- response, err := d.executeRequest("GET", endPointUrl)
+ response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return data_lake.DataExplorerWidgetModel{}, err
}
@@ -75,7 +75,7 @@ func (d *DataLakeWidget) GetAllDataLakeWidget()
([]data_lake.DataExplorerWidgetM
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v3/datalake/dashboard/widgets", nil)
log.Printf("Get data from: %s", endPointUrl)
- response, err := d.executeRequest("GET", endPointUrl)
+ response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
@@ -105,7 +105,7 @@ func (d *DataLakeWidget)
DeleteSingleDataLakeWidget(widgetId string) error {
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v3/datalake/dashboard/widgets", []string{widgetId})
log.Printf("Delete data from: %s", endPointUrl)
- response, err := d.executeRequest("DELETE", endPointUrl)
+ response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
diff --git a/streampipes-client-go/streampipes/endpoint.go
b/streampipes-client-go/streampipes/endpoint.go
index 41384b7e91..5fccd5f3a5 100644
--- a/streampipes-client-go/streampipes/endpoint.go
+++ b/streampipes-client-go/streampipes/endpoint.go
@@ -18,20 +18,25 @@
package streampipes
import (
- "errors"
- "net/http"
-
+ "bytes"
+ "fmt"
"github.com/apache/streampipes/streampipes-client-go/streampipes/config"
headers
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/http_headers"
+ "io"
+ "net/http"
)
type endpoint struct {
config config.StreamPipesClientConfig
}
-func (e *endpoint) executeRequest(method string, endPointUrl string)
(*http.Response, error) {
+func (e *endpoint) executeRequest(method string, endPointUrl string, body
[]byte) (*http.Response, error) {
- req, err := http.NewRequest(method, endPointUrl, nil)
+ var reader io.Reader
+ if body != nil {
+ reader = bytes.NewReader(body)
+ }
+ req, err := http.NewRequest(method, endPointUrl, reader)
if err != nil {
return nil, err
}
@@ -53,23 +58,23 @@ func (e *endpoint) handleStatusCode(resp *http.Response)
error {
switch resp.StatusCode {
case http.StatusUnauthorized:
- return errors.New("The streamPipes Backend returned an
unauthorized error.\nplease check your ApiUser and/or Apikey to be correct.")
+ return fmt.Errorf("response code %d:"+"The streamPipes Backend
returned an unauthorized error.\nplease check your ApiUser and/or Apikey to be
correct.", resp.StatusCode)
case http.StatusForbidden:
- return errors.New("There seems to be an issue with the access
rights of the given user and the resource you queried.\n" +
- "Apparently, this user is not allowed to query the
resource.\n" +
- "Please check the user's permissions or contact your
StreamPipes admin.")
+ return fmt.Errorf("response code %d:"+"There seems to be an
issue with the access rights of the given user and the resource you queried.\n"+
+ "Apparently, this user is not allowed to query the
resource.\n"+
+ "Please check the user's permissions or contact your
StreamPipes admin.", resp.StatusCode)
case http.StatusNotFound:
- return errors.New("There seems to be an issue with the Go
Client calling the API inappropriately.\n" +
- "This should not happen, but unfortunately did.\n" +
- "If you don't mind, it would be awesome to let us know
by creating an issue at https://github.com/apache/streampipes.\n")
+ return fmt.Errorf("response code %d:"+"There seems to be an
issue with the Go Client calling the API inappropriately.\n"+
+ "This should not happen, but unfortunately did.\n"+
+ "If you don't mind, it would be awesome to let us know
by creating an issue at https://github.com/apache/streampipes.\n",
resp.StatusCode)
case http.StatusMethodNotAllowed:
- return errors.New("There seems to be an issue with the Go
Client calling the API inappropriately.\n" +
- "This should not happen, but unfortunately did.\n" +
- "If you don't mind, it would be awesome to let us know
by creating an issue at https://github.com/apache/streampipes.\n")
+ return fmt.Errorf("response code %d:"+"There seems to be an
issue with the Go Client calling the API inappropriately.\n"+
+ "This should not happen, but unfortunately did.\n"+
+ "If you don't mind, it would be awesome to let us know
by creating an issue at https://github.com/apache/streampipes.\n",
resp.StatusCode)
case http.StatusInternalServerError:
- return errors.New("streamPipes internal error")
+ return fmt.Errorf("response code %d:"+"streamPipes internal
error", resp.StatusCode)
default:
- return errors.New(resp.Status)
+ return fmt.Errorf(resp.Status)
}
}
diff --git a/streampipes-client-go/streampipes/functions_api.go
b/streampipes-client-go/streampipes/functions_api.go
index 3b696753aa..14375f3e1e 100644
--- a/streampipes-client-go/streampipes/functions_api.go
+++ b/streampipes-client-go/streampipes/functions_api.go
@@ -44,7 +44,7 @@ func (f *Functions) GetFunctionLogs(functionId string)
([]functions.SpLogEntry,
endPointUrl := util.NewStreamPipesApiPath(f.config.Url,
"streampipes-backend/api/v2/functions", []string{functionId, "logs"})
log.Printf("Get data from: %s", endPointUrl)
- response, err := f.executeRequest("GET", endPointUrl)
+ response, err := f.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
@@ -75,7 +75,7 @@ func (f *Functions) GetFunctionMetrics(functionId string)
(functions.SpMetricsEn
endPointUrl := util.NewStreamPipesApiPath(f.config.Url,
"streampipes-backend/api/v2/functions", []string{functionId, "metrics"})
log.Printf("Get data from: %s", endPointUrl)
- response, err := f.executeRequest("GET", endPointUrl)
+ response, err := f.executeRequest("GET", endPointUrl, nil)
if err != nil {
return functions.SpMetricsEntry{}, err
}
@@ -106,7 +106,7 @@ func (f *Functions) GetAllFunction()
([]functions.FunctionDefinition, error) {
endPointUrl := util.NewStreamPipesApiPath(f.config.Url,
"streampipes-backend/api/v2/functions", nil)
log.Printf("Get data from: %s", endPointUrl)
- response, err := f.executeRequest("GET", endPointUrl)
+ response, err := f.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
@@ -136,7 +136,7 @@ func (f *Functions) DeleteSingleFunction(functionId string)
error {
endPointUrl := util.NewStreamPipesApiPath(f.config.Url,
"streampipes-backend/api/v2/functions", []string{functionId})
log.Printf("Delete data from: %s", endPointUrl)
- response, err := f.executeRequest("DELETE", endPointUrl)
+ response, err := f.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
diff --git
a/streampipes-client-go/streampipes/internal/serializer/deserializer.go
b/streampipes-client-go/streampipes/internal/serializer/deserializer.go
index a6007b9e86..fd9bb40eb3 100644
--- a/streampipes-client-go/streampipes/internal/serializer/deserializer.go
+++ b/streampipes-client-go/streampipes/internal/serializer/deserializer.go
@@ -267,3 +267,65 @@ func (p *UserAccountDeserializer) Unmarshal(data []byte)
(interface{}, error) {
return userAccount, nil
}
+
+type PipelineDeserializer struct{}
+
+func NewPipelineDeserializer() *PipelineDeserializer {
+ return &PipelineDeserializer{}
+}
+
+func (p *PipelineDeserializer) Unmarshal(data []byte) (interface{}, error) {
+ var pipeLine pipeline.Pipeline
+ err := json.Unmarshal(data, &pipeLine)
+ if err != nil {
+ return nil, err
+ }
+ return pipeLine, nil
+
+}
+
+type PipelinesDeserializer struct{}
+
+func NewPipelinesDeserializer() *PipelinesDeserializer {
+ return &PipelinesDeserializer{}
+}
+
+func (p *PipelinesDeserializer) Unmarshal(data []byte) (interface{}, error) {
+ var pipelines []pipeline.Pipeline
+ err := json.Unmarshal(data, &pipelines)
+ if err != nil {
+ return nil, err
+ }
+ return pipelines, nil
+
+}
+
+type PipelineStatusMessagesDeserializer struct{}
+
+func NewPipelineStatusMessagesDeserializer()
*PipelineStatusMessagesDeserializer {
+ return &PipelineStatusMessagesDeserializer{}
+}
+
+func (p *PipelineStatusMessagesDeserializer) Unmarshal(data []byte)
(interface{}, error) {
+ var pipelineStatusMessage []pipeline.PipelineStatusMessage
+ err := json.Unmarshal(data, &pipelineStatusMessage)
+ if err != nil {
+ return nil, err
+ }
+ return pipelineStatusMessage, nil
+}
+
+type PipelineOperationStatusDeserializer struct{}
+
+func NewPipelineOperationStatusDeserializer()
*PipelineOperationStatusDeserializer {
+ return &PipelineOperationStatusDeserializer{}
+}
+
+func (p *PipelineOperationStatusDeserializer) Unmarshal(data []byte)
(interface{}, error) {
+ var pipelineOperationStatus pipeline.PipelineOperationStatus
+ err := json.Unmarshal(data, &pipelineOperationStatus)
+ if err != nil {
+ return nil, err
+ }
+ return pipelineOperationStatus, nil
+}
diff --git
a/streampipes-client-go/streampipes/internal/serializer/serializer.go
b/streampipes-client-go/streampipes/internal/serializer/serializer.go
new file mode 100644
index 0000000000..24439a164b
--- /dev/null
+++ b/streampipes-client-go/streampipes/internal/serializer/serializer.go
@@ -0,0 +1,37 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package serializer
+
+import (
+ "encoding/json"
+
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/pipeline"
+)
+
+type PipelineSerializer struct{}
+
+func NewPipelineSerializer() PipelineSerializer {
+ return PipelineSerializer{}
+}
+
+func (p PipelineSerializer) Marshal(pp pipeline.Pipeline) ([]byte, error) {
+ data, err := json.Marshal(pp)
+ if err != nil {
+ return nil, err
+ }
+ return data, nil
+}
diff --git a/streampipes-client-go/streampipes/model/common.go
b/streampipes-client-go/streampipes/model/common.go
index 5a853fee55..7d71666411 100644
--- a/streampipes-client-go/streampipes/model/common.go
+++ b/streampipes-client-go/streampipes/model/common.go
@@ -65,12 +65,91 @@ type DataSeries struct {
type ResponseMessage struct {
Success bool `json:"success"`
- ElementName interface{} `json:"elementName"`
+ ElementName string `json:"elementName"`
Notifications []Notification `json:"notifications"`
}
type Notification struct {
- Title string `json:"title"`
- Description string `json:"description"`
- AdditionalInformation string `json:"additionalInformation"`
+ Title string `json:"title"`
+ Description interface{} `json:"description"`
+ AdditionalInformation string `json:"additionalInformation"`
+}
+
+type StaticPropertyType string
+
+const (
+ AnyStaticProperty StaticPropertyType =
"AnyStaticProperty"
+ CodeInputStaticProperty StaticPropertyType =
"CodeInputStaticProperty"
+ CollectionStaticProperty StaticPropertyType =
"CollectionStaticProperty"
+ ColorPickerStaticProperty StaticPropertyType =
"ColorPickerStaticProperty"
+ DomainStaticProperty StaticPropertyType =
"DomainStaticProperty"
+ FreeTextStaticProperty StaticPropertyType =
"FreeTextStaticProperty"
+ FileStaticProperty StaticPropertyType =
"FileStaticProperty"
+ MappingPropertyUnary StaticPropertyType =
"MappingPropertyUnary"
+ MappingPropertyNary StaticPropertyType =
"MappingPropertyNary"
+ MatchingStaticProperty StaticPropertyType =
"MatchingStaticProperty"
+ OneOfStaticProperty StaticPropertyType =
"OneOfStaticProperty"
+ RuntimeResolvableAnyStaticProperty StaticPropertyType =
"RuntimeResolvableAnyStaticProperty"
+ RuntimeResolvableGroupStaticProperty StaticPropertyType =
"RuntimeResolvableGroupStaticProperty"
+ RuntimeResolvableOneOfStaticProperty StaticPropertyType =
"RuntimeResolvableOneOfStaticProperty"
+ RuntimeResolvableTreeInputStaticProperty StaticPropertyType =
"RuntimeResolvableTreeInputStaticProperty"
+ StaticPropertyGroup StaticPropertyType =
"StaticPropertyGroup"
+ StaticPropertyAlternatives StaticPropertyType =
"StaticPropertyAlternatives"
+ StaticPropertyAlternative StaticPropertyType =
"StaticPropertyAlternative"
+ SecretStaticProperty StaticPropertyType =
"SecretStaticProperty"
+ SlideToggleStaticProperty StaticPropertyType =
"SlideToggleStaticProperty"
+)
+
+type StaticProperty struct {
+ Optional bool `json:"optional,omitempty"`
+ StaticPropertyType StaticPropertyType `json:"staticPropertyType"`
+ Index int32 `json:"index"`
+ Label string `json:"label"`
+ Description string `json:"description"`
+ InternalName string `json:"internalName"`
+ Predefined bool `json:"predefined"`
+ Class string `json:"@class"`
+}
+
+type SpDataStream struct {
+ ElementId string `json:"elementId"`
+ Dom string `json:"dom"`
+ ConnectedTo []string `json:"connectedTo"`
+ Name string `json:"name"`
+ Description string `json:"description"`
+ IconUrl string `json:"iconUrl"`
+ AppId string `json:"appId"`
+ IncludesAssets bool `json:"includesAssets"`
+ IncludesLocales bool `json:"includesLocales"`
+ IncludedAssets []string `json:"includedAssets"`
+ IncludedLocales []string `json:"includedLocales"`
+ InternallyManaged bool `json:"internallyManaged"`
+ EventGrounding EventGrounding `json:"eventGrounding"`
+ EventSchema EventSchema `json:"eventSchema"`
+ Category []string `json:"category"`
+ Index int32 `json:"index"`
+ CorrespondingAdapterId string `json:"correspondingAdapterId"`
+ Rev string `json:"_rev"`
+}
+
+type EventGrounding struct {
+ TransportProtocols []TransportProtocol `json:"transportProtocols"`
+ TransportFormats []TransportFormat `json:"transportFormats"`
+}
+
+type TransportProtocol struct {
+ ElementId string `json:"elementId"`
+ BrokerHostname string `json:"brokerHostname"`
+ TopicDefinition TopicDefinition `json:"topicDefinition"`
+ Class string `json:"@class,omitempty"`
+ Port int `json:"port"`
+}
+
+type TopicDefinition struct {
+ ActualTopicName string `json:"actualTopicName"`
+ Class string `json:"@class"`
+}
+
+type TransportFormat struct {
+ RdfType []string `json:"rdfType"`
}
diff --git a/streampipes-client-go/streampipes/model/pipeline/pipeline.go
b/streampipes-client-go/streampipes/model/pipeline/pipeline.go
new file mode 100644
index 0000000000..0586be659d
--- /dev/null
+++ b/streampipes-client-go/streampipes/model/pipeline/pipeline.go
@@ -0,0 +1,163 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package pipeline
+
+import (
+ "github.com/apache/streampipes/streampipes-client-go/streampipes/model"
+)
+
+type PipelineHealthStatus string
+
+const (
+ OK PipelineHealthStatus = "OK"
+ REQUIRES_ATTENTION PipelineHealthStatus = "REQUIRES_ATTENTION"
+ FAILURE PipelineHealthStatus = "FAILURE"
+)
+
+type Pipeline struct {
+ Sepas []DataProcessorInvocation //`json:"sepas"`
+ Streams []model.SpDataStream //`json:"streams"`
+ Name string //`json:"name"`
+ Description string
//`json:"description,omitempty"`
+ Actions []DataSinkInvocation //`json:"actions"`
+ Running bool //`json:"running"`
+ RestartOnSystemReboot bool
//`json:"restartOnSystemReboot"`
+ Valid bool //`json:"valid"`
+ StartedAt int64
//`json:"startedAt,omitempty"`
+ CreatedAt int64 //`json:"createdAt"`
+ PublicElement bool //`json:"publicElement"`
+ CreatedByUser string //`json:"createdByUser"`
+ PipelineCategories []string
//`json:"pipelineCategories"`
+ PipelineNotifications []string
//`json:"pipelineNotifications"`
+ HealthStatus PipelineHealthStatus //`json:"healthStatus"`
+ ID string //`json:"_id,omitempty"`
+ Rev string
//`json:"_rev,omitempty"`
+}
+
+type DataProcessorInvocation struct {
+ ElementId string `json:"elementId"`
+ Dom string `json:"dom"`
+ ConnectedTo []string `json:"connectedTo"`
+ Name string `json:"name"`
+ Description string `json:"description"`
+ IconUrl string `json:"iconUrl"`
+ AppId string `json:"appId"`
+ IncludesAssets bool `json:"includesAssets"`
+ IncludesLocales bool `json:"includesLocales"`
+ IncludedAssets []string `json:"includedAssets"`
+ IncludedLocales []string `json:"includedLocales"`
+ InternallyManaged bool
`json:"internallyManaged"`
+ Version int32 `json:"version"`
+ InputStreams []model.SpDataStream `json:"inputStreams"`
+ StaticProperties []model.StaticProperty
`json:"staticProperties"`
+ BelongsTo string `json:"belongsTo"`
+ StatusInfoSettings ElementStatusInfoSettings
`json:"statusInfoSettings"`
+ SupportedGrounding model.EventGrounding
`json:"supportedGrounding"`
+ CorrespondingPipeline string
`json:"correspondingPipeline"`
+ CorresponddingUser string
`json:"correspondingUser"`
+ StreamRequirements []model.SpDataStream
`json:"streamRequirements"`
+ Configured bool `json:"configured"`
+ Uncompleted bool `json:"uncompleted"`
+ SelectedEndpointUrl string
`json:"selectedEndpointUrl"`
+ OutputStream model.SpDataStream `json:"outputStream"`
+ OutputStrategies []OutputStrategy
`json:"outputStrategies"`
+ PathName string `json:"pathName"`
+ Category []string `json:"category"`
+ Rev string `json:"_rev"`
+}
+
+type ElementStatusInfoSettings struct {
+ ElementIdentifier string `json:"elementIdentifier"`
+ KafkaHost string `json:"kafkaHost"`
+ KafkaPort int32 `json:"kafkaPort"`
+ ErrorTopic string `json:"errorTopic"`
+ StatsTopic string `json:"statsTopic"`
+}
+
+type OutputStrategy struct {
+ Name string `json:"name"`
+ RenameRules []PropertyRenameRule `json:"renameRules"`
+ Class string `json:"class,omitempty"`
+}
+
+type PropertyRenameRule struct {
+ RuntimeID string `json:"runtimeId"`
+ NewRuntimeName string `json:"newRuntimeName"`
+}
+
+type DataSinkInvocation struct {
+ ElementId string `json:"elementId"`
+ Dom string `json:"dom"`
+ ConnectedTo []string `json:"connectedTo"`
+ Name string `json:"name"`
+ Description string `json:"description"`
+ IconUrl string `json:"iconUrl"`
+ AppId string `json:"appId"`
+ IncludesAssets bool `json:"includesAssets"`
+ IncludesLocales bool `json:"includesLocales"`
+ IncludedAssets []string `json:"includedAssets"`
+ IncludedLocales []string `json:"includedLocales"`
+ InternallyManaged bool
`json:"internallyManaged"`
+ Version int32 `json:"version"`
+ InputStreams []model.SpDataStream `json:"inputStreams"`
+ StaticProperties []model.StaticProperty
`json:"staticProperties"`
+ BelongsTo string `json:"belongsTo"`
+ StatusInfoSettings ElementStatusInfoSettings
`json:"statusInfoSettings"`
+ SupportedGrounding model.EventGrounding
`json:"supportedGrounding"`
+ CorrespondingPipeline string
`json:"correspondingPipeline"`
+ CorrespondingUser string
`json:"correspondingUser"`
+ StreamRequirements []model.SpDataStream
`json:"streamRequirements"`
+ Configured bool `json:"configured"`
+ Uncompleted bool `json:"uncompleted"`
+ SelectedEndpointUrl string
`json:"selectedEndpointUrl"`
+ Category []string `json:"category"`
+ Rev string `json:"_rev"`
+}
+
+type PipelineElementStatus struct {
+ ElementID string `json:"elementId"`
+ ElementName string `json:"elementName"`
+ OptionalMessage string `json:"optionalMessage"`
+ Success bool `json:"success"`
+}
+
+type PipelineOperationStatus struct {
+ PipelineId string `json:"pipelineId"`
+ PipelineName string `json:"pipelineName"`
+ Title string `json:"title"`
+ Success bool `json:"success"`
+ ElementStatus []PipelineElementStatus `json:"elementStatus"`
+}
+
+type PipelineStatusMessage struct {
+ PipelineId string `json:"pipelineId"`
+ Timestamp int64 `json:"timestamp"`
+ MessageType string `json:"messageType"`
+ Message string `json:"message"`
+}
+type PipelineElementValidationLevel string
+
+const (
+ ValidationInfo PipelineElementValidationLevel = "INFO"
+ ValidationError PipelineElementValidationLevel = "ERROR"
+)
+
+type PipelineElementValidationInfo struct {
+ Level PipelineElementValidationLevel `json:"level"`
+ Message string `json:"message"`
+}
diff --git a/streampipes-client-go/streampipes/pipeline_api.go
b/streampipes-client-go/streampipes/pipeline_api.go
index ba1e49f7fe..e58ccc7096 100644
--- a/streampipes-client-go/streampipes/pipeline_api.go
+++ b/streampipes-client-go/streampipes/pipeline_api.go
@@ -18,10 +18,10 @@
package streampipes
import (
+ "fmt"
"io"
"log"
"net/http"
-
"github.com/apache/streampipes/streampipes-client-go/streampipes/config"
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/serializer"
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/util"
@@ -40,12 +40,280 @@ func NewPipeline(clientConfig
config.StreamPipesClientConfig) *Pipeline {
}
}
+// GetSinglePipeline get a specific pipeline with the given id
+func (p *Pipeline) GetSinglePipeline(pipelineId string) (pipeline.Pipeline,
error) {
+
+ endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelines", []string{pipelineId})
+ log.Printf("Get data from: %s", endPointUrl)
+
+ response, err := p.executeRequest("GET", endPointUrl, nil)
+ if err != nil {
+ return pipeline.Pipeline{}, err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = p.handleStatusCode(response)
+ if err != nil {
+ return pipeline.Pipeline{}, err
+ }
+ }
+
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return pipeline.Pipeline{}, err
+ }
+
+ unmarshalData, err :=
serializer.NewPipelineDeserializer().Unmarshal(body)
+ if err != nil {
+ return pipeline.Pipeline{}, err
+ }
+ pipeLine := unmarshalData.(pipeline.Pipeline)
+
+ return pipeLine, nil
+}
+
+// DeleteSinglePipeline delete a pipeline with a given id
+func (p *Pipeline) DeleteSinglePipeline(pipelineId string) error {
+
+ endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelines", []string{pipelineId})
+ log.Printf("Delete data from: %s", endPointUrl)
+
+ response, err := p.executeRequest("DELETE", endPointUrl, nil)
+ if err != nil {
+ return err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = p.handleStatusCode(response)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// UpdateSinglePipeline update an existing pipeline.
+// If the pipeline cannot be updated successfully, it may be due to the
incorrect pipeline that was passed in.
+func (p *Pipeline) UpdateSinglePipeline(pp pipeline.Pipeline, pipelineId
string) (model.ResponseMessage, error) {
+ endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelines", []string{pipelineId})
+ body, err := serializer.NewPipelineSerializer().Marshal(pp)
+ if err != nil {
+ return model.ResponseMessage{}, err
+ }
+ response, err := p.executeRequest("PUT", endPointUrl, body)
+ if err != nil {
+ return model.ResponseMessage{}, err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = p.handleStatusCode(response)
+ if err != nil {
+ return model.ResponseMessage{}, err
+ }
+ }
+ data, err := io.ReadAll(response.Body)
+ if err != nil {
+ return model.ResponseMessage{}, err
+ }
+
+ unmarshalData, err :=
serializer.NewResponseMessageDeserializer().Unmarshal(data)
+ if err != nil {
+ return model.ResponseMessage{}, err
+ }
+ message := unmarshalData.(model.ResponseMessage)
+
+ return message, nil
+}
+
+// GetAllPipeline get all pipelines of the current user
+func (p *Pipeline) GetAllPipeline() ([]pipeline.Pipeline, error) {
+ endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelines", nil)
+
+ response, err := p.executeRequest("GET", endPointUrl, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = p.handleStatusCode(response)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ unmarshalData, err :=
serializer.NewPipelinesDeserializer().Unmarshal(body)
+ if err != nil {
+ return nil, err
+ }
+ pipelines := unmarshalData.([]pipeline.Pipeline)
+
+ return pipelines, nil
+}
+
+// CreatePipeline store a new pipeline
+func (p *Pipeline) CreatePipeline(pp pipeline.Pipeline)
(model.ResponseMessage, error) {
+ endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelines", nil)
+
+ body, err := serializer.NewPipelineSerializer().Marshal(pp)
+ if err != nil {
+ return model.ResponseMessage{}, err
+ }
+ response, err := p.executeRequest("POST", endPointUrl, body)
+ if err != nil {
+ return model.ResponseMessage{}, err
+ }
+ if response.StatusCode != http.StatusOK {
+ err = p.handleStatusCode(response)
+ if err != nil {
+ return model.ResponseMessage{}, err
+ }
+ }
+ data, err := io.ReadAll(response.Body)
+ if err != nil {
+ return model.ResponseMessage{}, err
+ }
+
+ unmarshalData, err :=
serializer.NewResponseMessageDeserializer().Unmarshal(data)
+ if err != nil {
+ fmt.Println(err, 11)
+ return model.ResponseMessage{}, err
+ }
+ message := unmarshalData.(model.ResponseMessage)
+
+ return message, nil
+}
+
+// StopSinglePipeline stop the pipeline with the given id
+func (p *Pipeline) StopSinglePipeline(pipelineId string)
(pipeline.PipelineOperationStatus, error) {
+ endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelines", []string{pipelineId, "stop"})
+
+ response, err := p.executeRequest("GET", endPointUrl, nil)
+ if err != nil {
+ return pipeline.PipelineOperationStatus{}, err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = p.handleStatusCode(response)
+ if err != nil {
+ return pipeline.PipelineOperationStatus{}, err
+ }
+ }
+
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return pipeline.PipelineOperationStatus{}, err
+ }
+
+ unmarshalData, err :=
serializer.NewPipelineOperationStatusDeserializer().Unmarshal(body)
+ if err != nil {
+ return pipeline.PipelineOperationStatus{}, err
+ }
+ status := unmarshalData.(pipeline.PipelineOperationStatus)
+
+ return status, nil
+}
+
+// GetSinglePipelineStatus get the pipeline status of a given pipeline
+func (p *Pipeline) GetSinglePipelineStatus(pipelineId string)
([]pipeline.PipelineStatusMessage, error) {
+ endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelines", []string{pipelineId, "status"})
+
+ response, err := p.executeRequest("GET", endPointUrl, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = p.handleStatusCode(response)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ unmarshalData, err :=
serializer.NewPipelineStatusMessagesDeserializer().Unmarshal(body)
+ if err != nil {
+ return nil, err
+ }
+ status := unmarshalData.([]pipeline.PipelineStatusMessage)
+
+ return status, nil
+}
+
+// StartSinglePipeline start the pipeline with the given id
+func (p *Pipeline) StartSinglePipeline(pipelineId string)
(pipeline.PipelineOperationStatus, error) {
+ endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelines", []string{pipelineId, "start"})
+
+ response, err := p.executeRequest("GET", endPointUrl, nil)
+ if err != nil {
+ return pipeline.PipelineOperationStatus{}, err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = p.handleStatusCode(response)
+ if err != nil {
+ return pipeline.PipelineOperationStatus{}, err
+ }
+ }
+
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return pipeline.PipelineOperationStatus{}, err
+ }
+
+ unmarshalData, err :=
serializer.NewPipelineOperationStatusDeserializer().Unmarshal(body)
+ if err != nil {
+ return pipeline.PipelineOperationStatus{}, err
+ }
+ status := unmarshalData.(pipeline.PipelineOperationStatus)
+
+ return status, nil
+}
+
+// GetContainsElementPipeline returns all pipelines that contain the element
with the elementld
+func (p *Pipeline) GetContainsElementPipeline(pipelineId string)
([]pipeline.Pipeline, error) {
+ endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelines/contains", []string{pipelineId})
+
+ response, err := p.executeRequest("GET", endPointUrl, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ if response.StatusCode != http.StatusOK {
+ err = p.handleStatusCode(response)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ unmarshalData, err :=
serializer.NewPipelinesDeserializer().Unmarshal(body)
+ if err != nil {
+ return nil, err
+ }
+ pipelines := unmarshalData.([]pipeline.Pipeline)
+
+ return pipelines, nil
+}
+
func (p *Pipeline) GetPipelineCategory() ([]pipeline.PipelineCategory, error) {
endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelinecategories", nil)
log.Printf("Get data from: %s", endPointUrl)
- response, err := p.executeRequest("GET", endPointUrl)
+ response, err := p.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
@@ -76,7 +344,7 @@ func (p *Pipeline) DeletePipelineCategory(categoryId string)
(model.ResponseMess
endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelinecategories", []string{categoryId})
log.Printf("Delete data from: %s", endPointUrl)
- response, err := p.executeRequest("DELETE", endPointUrl)
+ response, err := p.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return model.ResponseMessage{}, err
}
diff --git a/streampipes-client-go/streampipes/streampipes_version_api.go
b/streampipes-client-go/streampipes/streampipes_version_api.go
index 195ecc476d..6c5b924e0e 100644
--- a/streampipes-client-go/streampipes/streampipes_version_api.go
+++ b/streampipes-client-go/streampipes/streampipes_version_api.go
@@ -45,7 +45,7 @@ func (d *Versions) GetStreamPipesVersion()
(streampipes_version.Versions, error)
endPointUrl := util.NewStreamPipesApiPath(d.config.Url,
"streampipes-backend/api/v2/info/versions", nil)
log.Printf("Get data from: %s", endPointUrl)
- response, err := d.executeRequest("GET", endPointUrl)
+ response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return streampipes_version.Versions{}, err
}
diff --git a/streampipes-client-go/streampipes/user_api.go
b/streampipes-client-go/streampipes/user_api.go
index 6499d50b7d..22e8832284 100644
--- a/streampipes-client-go/streampipes/user_api.go
+++ b/streampipes-client-go/streampipes/user_api.go
@@ -44,7 +44,7 @@ func (s *StreamPipesUserInfo)
GetSingleStreamPipesUserAccountInfo(principalId st
endPointUrl := util.NewStreamPipesApiPath(s.config.Url,
"streampipes-backend/api/v2/users", []string{principalId})
log.Printf("Get data from: %s", endPointUrl)
- response, err := s.executeRequest("GET", endPointUrl)
+ response, err := s.executeRequest("GET", endPointUrl, nil)
if err != nil {
return streampipes_user.UserAccount{}, err
}
@@ -75,7 +75,7 @@ func (s *StreamPipesUserInfo)
GetAllStreamPipesShortUserInfo() ([]streampipes_us
endPointUrl := util.NewStreamPipesApiPath(s.config.Url,
"streampipes-backend/api/v2/users", nil)
log.Printf("Get data from: %s", endPointUrl)
- response, err := s.executeRequest("GET", endPointUrl)
+ response, err := s.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
@@ -105,7 +105,7 @@ func (s *StreamPipesUserInfo)
DeleteSingleStreamPipesShortUserInfo(principalId s
endPointUrl := util.NewStreamPipesApiPath(s.config.Url,
"streampipes-backend/api/v2/users", []string{principalId})
log.Printf("Delete data from: %s", endPointUrl)
- response, err := s.executeRequest("DELETE", endPointUrl)
+ response, err := s.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}