This is an automated email from the ASF dual-hosted git repository.
zhenyu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 46de302e90 feat: implement the e2e testing framework of go-client
(#3261)
46de302e90 is described below
commit 46de302e905a556a3e8682d5845ea92552318dc7
Author: wenyu <[email protected]>
AuthorDate: Wed Oct 30 20:04:08 2024 +0800
feat: implement the e2e testing framework of go-client (#3261)
* feat: Implement the e2e testing framework of go-client
* feat: Implement the e2e testing framework of go-client
* feat: Implement the e2e testing framework of go-client
* feat: Implement the e2e testing framework of go-client
* feat: supplement the model
* fix:unmarshal fail
* fix:ci fail
* fix:ci fail
* fix:ci fail
* fix:ci fail
* fix:ci fail
* fix:ci fail
* fix:ci fail
* fix:ci fail
* fix:ci fail
* fix:ci fail
* fix:ci fail
* feat : implement datalake_test
* fix : unmarshal fail
---
.github/workflows/go-client-e2e-test.yml | 66 ++
streampipes-client-e2e/README.md | 46 +
streampipes-client-e2e/docker-compose.yml | 135 +++
.../go-client-e2e/adapter/machine.json | 225 +++++
.../go-client-e2e/adapter_test.go | 134 +++
.../go-client-e2e/datalake_test.go | 52 ++
streampipes-client-e2e/go-client-e2e/go.mod | 23 +
.../go-client-e2e/pipeline_test.go | 138 +++
.../go-client-e2e/pipelines/pipelines.json | 960 +++++++++++++++++++++
.../go-client-e2e/utils/create_machine_apadter.go | 37 +
.../go-client-e2e/utils/streampipes_client.go | 36 +
streampipes-client-e2e/tool/go-client-e2e.sh | 61 ++
streampipes-client-e2e/tool/install-element.sh | 113 +++
.../tool/start-streampipes-client-e2e.sh | 148 ++++
streampipes-client-go/streampipes/adapter_api.go | 11 +-
.../internal/serializer/deserializer.go | 158 ++--
.../model/adapter/adapter_description.go | 1 +
streampipes-client-go/streampipes/model/common.go | 45 +
.../streampipes/model/pipeline/pipeline.go | 34 +-
streampipes-client-go/streampipes/pipeline_api.go | 14 +-
20 files changed, 2349 insertions(+), 88 deletions(-)
diff --git a/.github/workflows/go-client-e2e-test.yml
b/.github/workflows/go-client-e2e-test.yml
new file mode 100644
index 0000000000..4adec93e16
--- /dev/null
+++ b/.github/workflows/go-client-e2e-test.yml
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: run-e2e-tests
+
+on:
+ pull_request:
+
+jobs:
+ build-and-push-to-docker:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v1
+
+ - name: Cache Docker layers
+ uses: actions/cache@v2
+ with:
+ path: /tmp/.buildx-cache
+ key: ${{ runner.os }}-buildx-${{ github.sha }}
+ restore-keys: |
+ ${{ runner.os }}-buildx-
+
+ - name: Set up Go 1.21
+ uses: actions/setup-go@v3
+ with:
+ go-version: '1.21'
+
+ - name: Set up JDK 17
+ uses: actions/setup-java@v4
+ with:
+ distribution: 'temurin'
+ java-version: '17'
+ cache: 'maven'
+
+ - name: Build with Maven
+ run: mvn clean install
+
+ - name: Build Docker image
+ run: |
+ docker build -t
streampipes_pipeline-elements-all-jvm:release-validation
./streampipes-extensions/streampipes-extensions-all-jvm
+ docker build -t streampipes_backend:release-validation
./streampipes-service-core
+
+ - name: Start streampipes
+ run: docker compose -f ./streampipes-client-e2e/docker-compose.yml -p
streampipes-client-e2e up -d
+
+ - name: Start go client test
+ run: |
+ cd ./streampipes-client-e2e/tool
+ chmod +x ./start-streampipes-client-e2e.sh
+ ./start-streampipes-client-e2e.sh -t go-client-e2e.sh
\ No newline at end of file
diff --git a/streampipes-client-e2e/README.md b/streampipes-client-e2e/README.md
new file mode 100644
index 0000000000..dcd05bc21b
--- /dev/null
+++ b/streampipes-client-e2e/README.md
@@ -0,0 +1,46 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+# Client E2E
+
+## Environment Setup
+Before running, ensure that StreamPipe is operational and the corresponding
configuration is input into the `start-streampipes-client-e2e.sh` script.
+**Note**: This script must be executed from within the [tool](./tool)
directory.
+```shell
+./start-streampipes-client-e2e.sh -h 127.0.0.1 -p 8030 -u
[email protected] -pw admin -t go-client-e2e.sh
+```
+
+## Usage Instructions
+| Parameter | Default | Required | Description
|
+|-----------|-------------------|----------|-----------------------------------------------------------------------------------------------------------|
+| `-h` | `127.0.0.1` | No | Host address
|
+| `-p` | `8030` | No | Backend port
|
+| `-u` | `[email protected]` | No | User
|
+| `-pw` | `admin` | No | Password
|
+| `-t` | `""` | Yes | Name of the client's E2E test
startup script (the script must be located in the [tool](./tool) directory) |
+
+## How to add E2E in a new language
+1. If you need to define an E2E test in a new language, you need to download
the element you need to use in `install-element.sh`.
+2. To accommodate different languages, you need to write an E2E startup
script. You can refer to `go-client-e2e.sh`. The startup script will receive
four parameters, so it can easily obtain the information needed to run the
Client.
+
+| Parameter | Description |
+|-----------|-------------|
+| `-h` | Host |
+| `-p` | Backend port|
+| `-u` | User |
+| `-k` | API Key |
\ No newline at end of file
diff --git a/streampipes-client-e2e/docker-compose.yml
b/streampipes-client-e2e/docker-compose.yml
new file mode 100644
index 0000000000..09e2c9baa5
--- /dev/null
+++ b/streampipes-client-e2e/docker-compose.yml
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.8"
+
+# global logging
+x-logging: &default-logging
+ options:
+ max-size: "12m"
+ max-file: "5"
+ driver: json-file
+
+services:
+ #### apache/streampipes
+ backend:
+ image: streampipes_backend:release-validation
+ ports:
+ - "8030:8030"
+ depends_on:
+ - couchdb
+ volumes:
+ - backend:/root/.streampipes
+ - files:/spImages
+ logging: *default-logging
+ networks:
+ spnet:
+
+ extensions-all-jvm:
+ image: streampipes_pipeline-elements-all-jvm:release-validation
+ volumes:
+ - files:/spImages
+ logging: *default-logging
+ networks:
+ spnet:
+
+ couchdb:
+ image: couchdb:2.3.1
+ environment:
+ - COUCHDB_USER=admin
+ - COUCHDB_PASSWORD=admin
+ logging: *default-logging
+ volumes:
+ - couchdb:/opt/couchdb/data
+ networks:
+ spnet:
+
+ kafka:
+ image: fogsyio/kafka:2.2.0
+ hostname: kafka
+ depends_on:
+ - zookeeper
+ environment:
+ # see: https://github.com/confluentinc/schema-registry/issues/648
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
+ KAFKA_LISTENERS: PLAINTEXT://:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_ADVERTISED_HOST_NAME: kafka
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ volumes:
+ - kafka:/kafka
+ - /var/run/docker.sock:/var/run/docker.sock
+ logging: *default-logging
+ networks:
+ spnet:
+
+ zookeeper:
+ image: fogsyio/zookeeper:3.4.13
+ logging: *default-logging
+ volumes:
+ - zookeeper:/opt/zookeeper-3.4.13
+ networks:
+ spnet:
+
+ influxdb:
+ image: influxdb:2.6
+ environment:
+ - INFLUXDB_DATA_ENGINE=tsm1
+ - INFLUXDB_REPORTING_DISABLED=false
+ - INFLUXDB_ADMIN_ENABLED=true
+ - DOCKER_INFLUXDB_INIT_USERNAME=admin
+ - DOCKER_INFLUXDB_INIT_PASSWORD=sp-admin
+ - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=sp-admin
+ - DOCKER_INFLUXDB_INIT_ORG=sp
+ - DOCKER_INFLUXDB_INIT_BUCKET=sp
+ - DOCKER_INFLUXDB_INIT_MODE=setup
+ volumes:
+ - influxdb:/var/lib/influxdb
+ - influxdb2:/var/lib/influxdb2
+ logging: *default-logging
+ networks:
+ spnet:
+
+ mosquitto:
+ image: eclipse-mosquitto:1.5.4
+ logging: *default-logging
+ networks:
+ spnet:
+
+ opcua:
+ image: mcr.microsoft.com/iotedge/opc-plc
+ logging: *default-logging
+ command: --ut=true
+ networks:
+ spnet:
+
+volumes:
+ kafka:
+ files:
+ couchdb:
+ zookeeper:
+ influxdb:
+ influxdb2:
+ backend:
+ nginx:
+
+networks:
+ spnet:
+ name: spnet
+ driver: bridge
+ ipam:
+ config:
+ - subnet: 172.31.0.0/16
diff --git a/streampipes-client-e2e/go-client-e2e/adapter/machine.json
b/streampipes-client-e2e/go-client-e2e/adapter/machine.json
new file mode 100644
index 0000000000..d50a60be01
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/adapter/machine.json
@@ -0,0 +1,225 @@
+{
+ "@class": "org.apache.streampipes.model.connect.adapter.AdapterDescription",
+ "appId": "org.apache.streampipes.connect.iiot.adapters.simulator.machine",
+ "connectedTo": null,
+ "description": "",
+ "dom": null,
+ "elementId":
"sp:org.apache.streampipes.connect.iiot.adapters.simulator.machine",
+ "includedAssets": [
+ "documentation.md",
+ "icon.png"
+ ],
+ "includedLocales": [
+ "strings.en"
+ ],
+ "includesAssets": true,
+ "includesLocales": true,
+ "internallyManaged": false,
+ "name": "test1",
+ "rev": "1-73ca816b2ed82d8232b864556c7f2d64",
+ "version": 0,
+ "category": [
+ "Debugging"
+ ],
+ "config": [
+ {
+ "@class":
"org.apache.streampipes.model.staticproperty.FreeTextStaticProperty",
+ "description": "The time to wait between two events in milliseconds",
+ "internalName": "wait-time-ms",
+ "label": "Wait Time (MS)",
+ "optional": false,
+ "staticPropertyType": "FreeTextStaticProperty",
+ "htmlAllowed": false,
+ "htmlFontFormat": false,
+ "mapsTo": null,
+ "multiLine": false,
+ "placeholdersSupported": false,
+ "requiredDatatype": "http://www.w3.org/2001/XMLSchema#integer",
+ "requiredDomainProperty": null,
+ "value": "1000",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.staticproperty.OneOfStaticProperty",
+ "description": "Select simulated sensor data to be published",
+ "internalName": "selected-simulator-option",
+ "label": "Select sensor",
+ "optional": false,
+ "staticPropertyType": "OneOfStaticProperty",
+ "horizontalRendering": false,
+ "options": [
+ {
+ "elementId": "sp:option:pOPYGJ",
+ "internalName": null,
+ "name": "flowrate",
+ "selected": true
+ },
+ {
+ "elementId": "sp:option:CpeHtP",
+ "internalName": null,
+ "name": "pressure",
+ "selected": false
+ },
+ {
+ "elementId": "sp:option:KYnhkL",
+ "internalName": null,
+ "name": "waterlevel",
+ "selected": false
+ }
+ ]
+ }
+ ],
+ "correspondingDataStreamElementId": null,
+ "correspondingServiceGroup": null,
+ "createdAt": 0,
+ "dataStream": {
+ "@class": "org.apache.streampipes.model.SpDataStream",
+ "appId": "urn:streampipes.apache.org:eventstream:hLNLBx",
+ "connectedTo": null,
+ "description": null,
+ "dom": null,
+ "elementId": "urn:streampipes.apache.org:eventstream:hLNLBx",
+ "includedAssets": [],
+ "includedLocales": [],
+ "includesAssets": false,
+ "includesLocales": false,
+ "internallyManaged": false,
+ "name": null,
+ "rev": null,
+ "category": null,
+ "correspondingAdapterId": null,
+ "eventGrounding": null,
+ "eventSchema": {
+ "eventProperties": [
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current density of the fluid",
+ "elementId": "sp:eventproperty:Ruvqlo",
+ "label": "Density",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "density",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null,
+ "id": 1510309651547
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current mass flow in the sensor",
+ "elementId": "sp:eventproperty:NyTjgp",
+ "label": "Mass Flow",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "mass_flow",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null,
+ "id": 4855204620884
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "The ID of the sensor",
+ "elementId": "sp:eventproperty:ThjvdL",
+ "label": "Sensor ID",
+ "propertyScope": "DIMENSION_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "sensorId",
+ "semanticType":
"https://streampipes.org/vocabulary/examples/watertank/v1/hasSensorId",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#string",
+ "valueSpecification": null,
+ "id": 3260657832066
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Any fault flags of the sensors",
+ "elementId": "sp:eventproperty:vxokoH",
+ "label": "Sensor Fault Flags",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "sensor_fault_flags",
+ "semanticType": "http://schema.org/Boolean",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean",
+ "valueSpecification": null,
+ "id": 6931662104350
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current temperature in degrees celsius",
+ "elementId": "sp:eventproperty:cmnXbe",
+ "label": "Temperature",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "temperature",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": "http://qudt.org/vocab/unit#DegreeCelsius",
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": {
+ "@class": "org.apache.streampipes.model.schema.QuantitativeValue",
+ "maxValue": 100,
+ "minValue": 0,
+ "step": 0.1
+ },
+ "id": 1980836171418
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "The current timestamp value",
+ "elementId": "sp:eventproperty:AYjMkj",
+ "label": "Timestamp",
+ "propertyScope": "HEADER_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "timestamp",
+ "semanticType": "http://schema.org/DateTime",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#long",
+ "valueSpecification": null,
+ "id": 5857972009985
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current volume flow",
+ "elementId": "sp:eventproperty:ORzRsw",
+ "label": "Volume Flow",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "volume_flow",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null,
+ "id": 64305844251
+ }
+ ]
+ },
+ "index": 0
+ },
+ "deploymentConfiguration": {
+ "desiredServiceTags": [],
+ "selectedEndpointUrl": null
+ },
+ "eventGrounding": {
+ "transportProtocols": []
+ },
+ "eventSchema": {
+ "eventProperties": []
+ },
+ "icon": null,
+ "rules": [],
+ "running": false,
+ "schemaRules": [],
+ "selectedEndpointUrl": null,
+ "streamRules": [],
+ "valueRules": []
+}
\ No newline at end of file
diff --git a/streampipes-client-e2e/go-client-e2e/adapter_test.go
b/streampipes-client-e2e/go-client-e2e/adapter_test.go
new file mode 100644
index 0000000000..88d41fcca2
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/adapter_test.go
@@ -0,0 +1,134 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package go_client_e2e
+
+import (
+ "go-client-e2e/utils"
+ "os"
+ "testing"
+)
+
+const (
+ E2E_ADAPTER_ID = "e2e-adapter-id"
+ E2E_ADAPTER_NAME = "e2e-adapter-name"
+ E2E_STREAM_REV = "e2e-stream-rev"
+ E2E_HOST_NAME = "e2e-host-name"
+ E2E_ADAPTER_OUT_TOPIC_NAME = "e2e-adapter-out-topic-name"
+ E2E_PORT = "\"e2e-port\""
+)
+
+func TestCreateAdapter(t *testing.T) {
+ TestDeleteAdapter(t)
+ streamPipesClient, err := utils.CreateStreamPipesClient()
+ if err != nil {
+ t.Log(err)
+ os.Exit(1)
+ }
+ data := utils.CreateData("adapter/machine.json")
+ err = streamPipesClient.Adapter().CreateAdapter(data)
+ if err != nil {
+ t.Log(err)
+ os.Exit(1)
+ }
+}
+
+func TestGetAdapter(t *testing.T) {
+ streamPipesClient, err := utils.CreateStreamPipesClient()
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+ adapters, err1 := streamPipesClient.Adapter().GetAllAdapter()
+ if err1 != nil {
+ t.Error(err1)
+ os.Exit(1)
+ }
+ if len(adapters) == 0 {
+ t.Log("adapter is null")
+ os.Exit(1)
+ }
+}
+
+func TestStartAdapter(t *testing.T) {
+ streamPipesClient, err := utils.CreateStreamPipesClient()
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+ adapters, err1 := streamPipesClient.Adapter().GetAllAdapter()
+ if err1 != nil {
+ t.Error(err1)
+ os.Exit(1)
+ }
+ if len(adapters) == 0 {
+ t.Log("adapter is null")
+ os.Exit(1)
+ }
+ err =
streamPipesClient.Adapter().StartSingleAdapter(adapters[0].ElementID)
+ if err != nil {
+ t.Error(err1)
+ os.Exit(1)
+ }
+}
+
+func TestStopAdapter(t *testing.T) {
+ streamPipesClient, err := utils.CreateStreamPipesClient()
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+ adapters, err1 := streamPipesClient.Adapter().GetAllAdapter()
+ if err1 != nil {
+ t.Error(err1)
+ os.Exit(1)
+ }
+ for _, adapter := range adapters {
+ err =
streamPipesClient.Adapter().StopSingleAdapter(adapter.ElementID)
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+ adapter, err =
streamPipesClient.Adapter().GetSingleAdapter(adapter.ElementID)
+ if err != nil || adapter.Running {
+ t.Error(err)
+ os.Exit(1)
+ }
+ }
+
+}
+
+func TestDeleteAdapter(t *testing.T) {
+ streamPipesClient, err := utils.CreateStreamPipesClient()
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+ adapters, err1 := streamPipesClient.Adapter().GetAllAdapter()
+ if err1 != nil {
+ t.Error(err1)
+ os.Exit(1)
+ }
+ for _, adapter := range adapters {
+ err =
streamPipesClient.Adapter().DeleteSingleAdapter(adapter.ElementID)
+ if err != nil {
+ t.Error(err1)
+ os.Exit(1)
+ }
+ }
+
+}
diff --git a/streampipes-client-e2e/go-client-e2e/datalake_test.go
b/streampipes-client-e2e/go-client-e2e/datalake_test.go
new file mode 100644
index 0000000000..6168139ed6
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/datalake_test.go
@@ -0,0 +1,52 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package go_client_e2e
+
+import (
+ "go-client-e2e/utils"
+ "os"
+ "testing"
+ "time"
+)
+
+func TestGetDataLake(t *testing.T) {
+ TestCreatePipeline(t)
+ TestStartPipeline(t)
+ streamPipesClient, err := utils.CreateStreamPipesClient()
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+ dataLake := streamPipesClient.DataLakeMeasures()
+ measures, err := dataLake.GetAllDataLakeMeasure()
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+
+ time.Sleep(10 * time.Second)
+
+ if len(measures) == 0 {
+ TestStopPipeline(t)
+ TestDeletePipeline(t)
+ t.Error("No data lake measures found")
+ os.Exit(1)
+ }
+ TestStopPipeline(t)
+ TestDeletePipeline(t)
+}
diff --git a/streampipes-client-e2e/go-client-e2e/go.mod
b/streampipes-client-e2e/go-client-e2e/go.mod
new file mode 100644
index 0000000000..f7c1cd0cc1
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/go.mod
@@ -0,0 +1,23 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+module go-client-e2e
+
+go 1.21.6
+
+require github.com/apache/streampipes/streampipes-client-go v0.0.0 // indirect
+replace "github.com/apache/streampipes/streampipes-client-go" =>
"../../streampipes-client-go"
\ No newline at end of file
diff --git a/streampipes-client-e2e/go-client-e2e/pipeline_test.go
b/streampipes-client-e2e/go-client-e2e/pipeline_test.go
new file mode 100644
index 0000000000..e85fc375fe
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/pipeline_test.go
@@ -0,0 +1,138 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package go_client_e2e
+
+import (
+ "go-client-e2e/utils"
+ "os"
+ "strings"
+ "testing"
+)
+
+func TestCreatePipeline(t *testing.T) {
+ TestDeletePipeline(t)
+ TestCreateAdapter(t)
+
+ streamPipesClient, err := utils.CreateStreamPipesClient()
+ if err != nil {
+ os.Exit(1)
+ }
+ TestStartAdapter(t)
+ adapters, err := streamPipesClient.Adapter().GetAllAdapter()
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+ if len(adapters) == 0 {
+ t.Log("adapter is null")
+ os.Exit(1)
+ }
+ adapter := adapters[0]
+
+ data := utils.CreateData("pipelines/pipelines.json")
+ pipeline := string(data)
+ pipeline = strings.Replace(pipeline, E2E_ADAPTER_ID, adapter.ElementID,
-1)
+ pipeline = strings.Replace(pipeline, E2E_ADAPTER_NAME, adapter.Name, -1)
+ pipeline = strings.Replace(pipeline, E2E_STREAM_REV, adapter.Rev, -1)
+ pipeline = strings.Replace(pipeline, E2E_ADAPTER_OUT_TOPIC_NAME,
adapter.EventGrounding.TransportProtocols[0].TopicDefinition.ActualTopicName,
-1)
+
+ message, statusErr :=
streamPipesClient.Pipeline().CreatePipeline([]byte(pipeline))
+ if statusErr != nil || !message.Success {
+ t.Log(statusErr, !message.Success)
+ os.Exit(1)
+ }
+
+}
+
+func TestGetPipeline(t *testing.T) {
+ streamPipesClient, err := utils.CreateStreamPipesClient()
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+ pipelines, err1 := streamPipesClient.Pipeline().GetAllPipeline()
+ if err1 != nil {
+ t.Error(err1)
+ os.Exit(1)
+ }
+ if len(pipelines) == 0 {
+ t.Log("pipelines is null")
+ os.Exit(1)
+ }
+}
+
+func TestStartPipeline(t *testing.T) {
+ streamPipesClient, err := utils.CreateStreamPipesClient()
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+ pipelines, err1 := streamPipesClient.Pipeline().GetAllPipeline()
+ if err1 != nil {
+ t.Error(err1)
+ os.Exit(1)
+ }
+ if len(pipelines) == 0 {
+ t.Log("pipelines is null")
+ os.Exit(1)
+ }
+
+ status, statusErr :=
streamPipesClient.Pipeline().StartSinglePipeline(pipelines[0].PipelineId)
+ if statusErr != nil || !status.Success {
+ t.Error(status.Title)
+ os.Exit(1)
+ }
+}
+
+func TestStopPipeline(t *testing.T) {
+ streamPipesClient, err := utils.CreateStreamPipesClient()
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+ pipelines, err1 := streamPipesClient.Pipeline().GetAllPipeline()
+ for _, pipeline := range pipelines {
+ operation, operationErr :=
streamPipesClient.Pipeline().StopSinglePipeline(pipeline.PipelineId)
+ if operationErr != nil || !operation.Success {
+ t.Error(err1)
+ os.Exit(1)
+ }
+ }
+
+}
+
+func TestDeletePipeline(t *testing.T) {
+ streamPipesClient, err := utils.CreateStreamPipesClient()
+ if err != nil {
+ t.Error(err)
+ os.Exit(1)
+ }
+ pipelines, err1 := streamPipesClient.Pipeline().GetAllPipeline()
+ if err1 != nil {
+ t.Error(err1)
+ os.Exit(1)
+ }
+ for _, pipeline := range pipelines {
+ err =
streamPipesClient.Pipeline().DeleteSinglePipeline(pipeline.PipelineId)
+ if err != nil {
+ t.Error(err1)
+ os.Exit(1)
+ }
+ }
+ TestDeleteAdapter(t)
+}
diff --git a/streampipes-client-e2e/go-client-e2e/pipelines/pipelines.json
b/streampipes-client-e2e/go-client-e2e/pipelines/pipelines.json
new file mode 100644
index 0000000000..3cebc73a5f
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/pipelines/pipelines.json
@@ -0,0 +1,960 @@
+{
+ "actions": [
+ {
+ "@class": "org.apache.streampipes.model.graph.DataSinkInvocation",
+ "appId": "org.apache.streampipes.sinks.internal.jvm.datalake",
+ "connectedTo": [
+ "jsplumb_1_j3BB"
+ ],
+ "description": "Stores events in the internal data lake.",
+ "dom": "jsplumb_2_Fahe",
+ "elementId": "sp:datasinkinvocation:MZnKuj:qULqX",
+ "includedAssets": [
+ "documentation.md",
+ "icon.png"
+ ],
+ "includedLocales": [
+ "strings.en"
+ ],
+ "includesAssets": true,
+ "includesLocales": true,
+ "internallyManaged": false,
+ "name": "Data Lake",
+ "rev": null,
+ "version": 2,
+ "belongsTo": "sp:org.apache.streampipes.sinks.internal.jvm.datalake",
+ "configured": true,
+ "correspondingPipeline": null,
+ "correspondingUser": null,
+ "inputStreams": [
+ {
+ "@class": "org.apache.streampipes.model.SpDataStream",
+ "appId": "urn:streampipes.apache.org:eventstream:mqYwaw",
+ "connectedTo": null,
+ "description": null,
+ "dom": null,
+ "elementId": "urn:streampipes.apache.org:eventstream:mqYwaw",
+ "includedAssets": [],
+ "includedLocales": [],
+ "includesAssets": false,
+ "includesLocales": false,
+ "internallyManaged": false,
+ "name": null,
+ "rev": null,
+ "category": null,
+ "correspondingAdapterId": null,
+ "eventGrounding": {
+ "transportProtocols": [
+ {
+ "@class":
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+ "brokerHostname": "kafka",
+ "elementId": null,
+ "topicDefinition": {
+ "@class":
"org.apache.streampipes.model.grounding.SimpleTopicDefinition",
+ "actualTopicName":
"org-apache-streampipes-internal-prbfXSLBvVHpoeX"
+ },
+ "acks": null,
+ "batchSize": null,
+ "groupId": null,
+ "kafkaPort": 9092,
+ "lingerMs": null,
+ "maxRequestSize": null,
+ "messageMaxBytes": null,
+ "offset": null,
+ "zookeeperHost": "zookeeper",
+ "zookeeperPort": 2181
+ }
+ ]
+ },
+ "eventSchema": {
+ "eventProperties": [
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current density of the fluid",
+ "elementId": "sp:eventproperty:Ruvqlo",
+ "label": "Density",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "density",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current mass flow in the sensor",
+ "elementId": "sp:eventproperty:NyTjgp",
+ "label": "Mass Flow",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "mass_flow",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Any fault flags of the sensors",
+ "elementId": "sp:eventproperty:vxokoH",
+ "label": "Sensor Fault Flags",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "sensor_fault_flags",
+ "semanticType": "http://schema.org/Boolean",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "The ID of the sensor",
+ "elementId": "sp:eventproperty:ThjvdL",
+ "label": "Sensor ID",
+ "propertyScope": "DIMENSION_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "sensorId",
+ "semanticType":
"https://streampipes.org/vocabulary/examples/watertank/v1/hasSensorId",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#string",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current temperature in degrees
celsius",
+ "elementId": "sp:eventproperty:cmnXbe",
+ "label": "Temperature",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "temperature",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": "http://qudt.org/vocab/unit#DegreeCelsius",
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": {
+ "@class":
"org.apache.streampipes.model.schema.QuantitativeValue",
+ "maxValue": 100,
+ "minValue": 0,
+ "step": 0.1
+ }
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "The current timestamp value",
+ "elementId": "sp:eventproperty:AYjMkj",
+ "label": "Timestamp",
+ "propertyScope": "HEADER_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "timestamp",
+ "semanticType": "http://schema.org/DateTime",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#long",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current volume flow",
+ "elementId": "sp:eventproperty:ORzRsw",
+ "label": "Volume Flow",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "volume_flow",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ }
+ ]
+ },
+ "index": 0
+ }
+ ],
+ "selectedEndpointUrl": null,
+ "serviceTagPrefix": "DATA_SINK",
+ "staticProperties": [
+ {
+ "@class":
"org.apache.streampipes.model.staticproperty.MappingPropertyUnary",
+ "description": "The value which contains a timestamp",
+ "internalName": "timestamp_mapping",
+ "label": "Timestamp Field",
+ "optional": false,
+ "staticPropertyType": "MappingPropertyUnary",
+ "mapsFromOptions": [
+ "s0::timestamp"
+ ],
+ "propertyScope": "NONE",
+ "requirementSelector": "r0::timestamp_mapping",
+ "selectedProperty": "s0::timestamp"
+ },
+ {
+ "@class":
"org.apache.streampipes.model.staticproperty.FreeTextStaticProperty",
+ "description": "The name of the identifier under which the data is
to be stored.",
+ "internalName": "db_measurement",
+ "label": "Identifier",
+ "optional": false,
+ "staticPropertyType": "FreeTextStaticProperty",
+ "htmlAllowed": false,
+ "htmlFontFormat": false,
+ "mapsTo": null,
+ "multiLine": false,
+ "placeholdersSupported": false,
+ "requiredDatatype": "http://www.w3.org/2001/XMLSchema#string",
+ "requiredDomainProperty": null,
+ "value": "test",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.staticproperty.OneOfStaticProperty",
+ "description": "Update existing schemas with the new one or extend
the existing schema with new properties",
+ "internalName": "schema_update",
+ "label": "Schema Update",
+ "optional": false,
+ "staticPropertyType": "OneOfStaticProperty",
+ "horizontalRendering": false,
+ "options": [
+ {
+ "elementId": "sp:option:wzqPbk",
+ "internalName": null,
+ "name": "Update schema",
+ "selected": true
+ },
+ {
+ "elementId": "sp:option:XmVjRp",
+ "internalName": null,
+ "name": "Extend existing schema",
+ "selected": false
+ }
+ ]
+ },
+ {
+ "@class":
"org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticProperty",
+ "description": "Selected fields will be stored as dimensions.",
+ "internalName": "dimensions_selection",
+ "label": "Dimensions",
+ "optional": false,
+ "staticPropertyType": "RuntimeResolvableAnyStaticProperty",
+ "horizontalRendering": false,
+ "options": [
+ {
+ "elementId": "sp:option:paBEDz",
+ "internalName": null,
+ "name": "sensor_fault_flags",
+ "selected": false
+ },
+ {
+ "elementId": "sp:option:JYOYId",
+ "internalName": null,
+ "name": "sensorId",
+ "selected": true
+ }
+ ],
+ "dependsOn": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.staticproperty.SlideToggleStaticProperty",
+ "description": "Fields having the same value than the previous event
are not stored. This only affects measurements, not tags.",
+ "internalName": "ignore_duplicates",
+ "label": "Ignore duplicates",
+ "optional": false,
+ "staticPropertyType": "SlideToggleStaticProperty",
+ "defaultValue": false,
+ "selected": false
+ }
+ ],
+ "streamRequirements": [
+ {
+ "@class": "org.apache.streampipes.model.SpDataStream",
+ "appId": "urn:streampipes.apache.org:eventstream:mqYwaw",
+ "connectedTo": null,
+ "description": null,
+ "dom": null,
+ "elementId": "urn:streampipes.apache.org:eventstream:mqYwaw",
+ "includedAssets": [],
+ "includedLocales": [],
+ "includesAssets": false,
+ "includesLocales": false,
+ "internallyManaged": false,
+ "name": null,
+ "rev": null,
+ "category": null,
+ "correspondingAdapterId": null,
+ "eventGrounding": null,
+ "eventSchema": {
+ "eventProperties": [
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": null,
+ "elementId": "sp:eventproperty:LrdSOF",
+ "label": null,
+ "propertyScope": null,
+ "runtimeId": null,
+ "runtimeName": "timestamp_mapping",
+ "semanticType": "http://schema.org/DateTime",
+ "measurementUnit": null,
+ "runtimeType": null,
+ "valueSpecification": null
+ }
+ ]
+ },
+ "index": 0
+ }
+ ],
+ "supportedGrounding": {
+ "transportProtocols": [
+ {
+ "@class":
"org.apache.streampipes.model.grounding.MqttTransportProtocol",
+ "brokerHostname": null,
+ "elementId": "sp:transportprotocol:IxejrD",
+ "topicDefinition": null,
+ "port": 0
+ },
+ {
+ "@class":
"org.apache.streampipes.model.grounding.NatsTransportProtocol",
+ "brokerHostname": null,
+ "elementId": "sp:transportprotocol:hMYAXY",
+ "topicDefinition": null,
+ "port": 0
+ },
+ {
+ "@class":
"org.apache.streampipes.model.grounding.JmsTransportProtocol",
+ "brokerHostname": null,
+ "elementId": "sp:transportprotocol:ycSaOT",
+ "topicDefinition": null,
+ "port": 0
+ },
+ {
+ "@class":
"org.apache.streampipes.model.grounding.PulsarTransportProtocol",
+ "brokerHostname": "pulsar://localhost:6650",
+ "elementId": "sp:transportprotocol:lvMQWc",
+ "topicDefinition": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+ "brokerHostname": null,
+ "elementId": "sp:transportprotocol:EiyDHT",
+ "topicDefinition": null,
+ "acks": null,
+ "batchSize": null,
+ "groupId": null,
+ "kafkaPort": 0,
+ "lingerMs": null,
+ "maxRequestSize": null,
+ "messageMaxBytes": null,
+ "offset": null,
+ "zookeeperHost": null,
+ "zookeeperPort": 0
+ }
+ ]
+ },
+ "uncompleted": false,
+ "category": [
+ "INTERNAL"
+ ]
+ }
+ ],
+ "description": "",
+ "name": "test",
+ "sepas": [
+ {
+ "@class": "org.apache.streampipes.model.graph.DataProcessorInvocation",
+ "appId":
"org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
+ "connectedTo": [
+ "jsplumb_0_x6Gz"
+ ],
+ "description": "Inverts the boolean value of the selected field",
+ "dom": "jsplumb_1_j3BB",
+ "elementId": "sp:dataprocessorinvocation:JOqFCM:R0okK",
+ "includedAssets": [
+ "documentation.md",
+ "icon.png"
+ ],
+ "includedLocales": [
+ "strings.en"
+ ],
+ "includesAssets": true,
+ "includesLocales": true,
+ "internallyManaged": false,
+ "name": "Boolean Inverter",
+ "rev": null,
+ "version": 0,
+ "belongsTo":
"sp:org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
+ "configured": true,
+ "correspondingPipeline": null,
+ "correspondingUser": null,
+ "inputStreams": [
+ {
+ "@class": "org.apache.streampipes.model.SpDataStream",
+ "appId": "urn:streampipes.apache.org:eventstream:VPyWtS",
+ "connectedTo": null,
+ "description": null,
+ "dom": null,
+ "elementId": "urn:streampipes.apache.org:eventstream:VPyWtS",
+ "includedAssets": [],
+ "includedLocales": [],
+ "includesAssets": false,
+ "includesLocales": false,
+ "internallyManaged": false,
+ "name": null,
+ "rev": null,
+ "category": null,
+ "correspondingAdapterId": null,
+ "eventGrounding": {
+ "transportProtocols": [
+ {
+ "@class":
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+ "brokerHostname": "kafka",
+ "elementId": "sp:transportprotocol:KoHuzT",
+ "topicDefinition": {
+ "@class":
"org.apache.streampipes.model.grounding.SimpleTopicDefinition",
+ "actualTopicName": "e2e-adapter-out-topic-name"
+ },
+ "acks": null,
+ "batchSize": null,
+ "groupId": null,
+ "kafkaPort": 9092,
+ "lingerMs": null,
+ "maxRequestSize": null,
+ "messageMaxBytes": null,
+ "offset": null,
+ "zookeeperHost": null,
+ "zookeeperPort": 0
+ }
+ ]
+ },
+ "eventSchema": {
+ "eventProperties": [
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current density of the fluid",
+ "elementId": "sp:eventproperty:Ruvqlo",
+ "label": "Density",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "density",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current mass flow in the sensor",
+ "elementId": "sp:eventproperty:NyTjgp",
+ "label": "Mass Flow",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "mass_flow",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "The ID of the sensor",
+ "elementId": "sp:eventproperty:ThjvdL",
+ "label": "Sensor ID",
+ "propertyScope": "DIMENSION_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "sensorId",
+ "semanticType":
"https://streampipes.org/vocabulary/examples/watertank/v1/hasSensorId",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#string",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Any fault flags of the sensors",
+ "elementId": "sp:eventproperty:vxokoH",
+ "label": "Sensor Fault Flags",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "sensor_fault_flags",
+ "semanticType": "http://schema.org/Boolean",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current temperature in degrees
celsius",
+ "elementId": "sp:eventproperty:cmnXbe",
+ "label": "Temperature",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "temperature",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": "http://qudt.org/vocab/unit#DegreeCelsius",
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": {
+ "@class":
"org.apache.streampipes.model.schema.QuantitativeValue",
+ "maxValue": 100,
+ "minValue": 0,
+ "step": 0.1
+ }
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "The current timestamp value",
+ "elementId": "sp:eventproperty:AYjMkj",
+ "label": "Timestamp",
+ "propertyScope": "HEADER_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "timestamp",
+ "semanticType": "http://schema.org/DateTime",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#long",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current volume flow",
+ "elementId": "sp:eventproperty:ORzRsw",
+ "label": "Volume Flow",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "volume_flow",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ }
+ ]
+ },
+ "index": 0
+ }
+ ],
+ "selectedEndpointUrl": null,
+ "serviceTagPrefix": "DATA_PROCESSOR",
+ "staticProperties": [
+ {
+ "@class":
"org.apache.streampipes.model.staticproperty.MappingPropertyUnary",
+ "description": "The field which should be inverted",
+ "internalName": "invert-field",
+ "label": "Invert field",
+ "optional": false,
+ "staticPropertyType": "MappingPropertyUnary",
+ "mapsFromOptions": [
+ "s0::sensor_fault_flags"
+ ],
+ "propertyScope": "NONE",
+ "requirementSelector": "r0::invert-field",
+ "selectedProperty": "s0::sensor_fault_flags"
+ }
+ ],
+ "streamRequirements": [
+ {
+ "@class": "org.apache.streampipes.model.SpDataStream",
+ "appId": "urn:streampipes.apache.org:eventstream:VPyWtS",
+ "connectedTo": null,
+ "description": null,
+ "dom": null,
+ "elementId": "urn:streampipes.apache.org:eventstream:VPyWtS",
+ "includedAssets": [],
+ "includedLocales": [],
+ "includesAssets": false,
+ "includesLocales": false,
+ "internallyManaged": false,
+ "name": null,
+ "rev": null,
+ "category": null,
+ "correspondingAdapterId": null,
+ "eventGrounding": null,
+ "eventSchema": {
+ "eventProperties": [
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": null,
+ "elementId": "sp:eventproperty:sHuzzM",
+ "label": null,
+ "propertyScope": null,
+ "runtimeId": null,
+ "runtimeName": "invert-field",
+ "semanticType": null,
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean",
+ "valueSpecification": null
+ }
+ ]
+ },
+ "index": 0
+ }
+ ],
+ "supportedGrounding": {
+ "transportProtocols": [
+ {
+ "@class":
"org.apache.streampipes.model.grounding.MqttTransportProtocol",
+ "brokerHostname": null,
+ "elementId": "sp:transportprotocol:IxejrD",
+ "topicDefinition": null,
+ "port": 0
+ },
+ {
+ "@class":
"org.apache.streampipes.model.grounding.NatsTransportProtocol",
+ "brokerHostname": null,
+ "elementId": "sp:transportprotocol:hMYAXY",
+ "topicDefinition": null,
+ "port": 0
+ },
+ {
+ "@class":
"org.apache.streampipes.model.grounding.JmsTransportProtocol",
+ "brokerHostname": null,
+ "elementId": "sp:transportprotocol:ycSaOT",
+ "topicDefinition": null,
+ "port": 0
+ },
+ {
+ "@class":
"org.apache.streampipes.model.grounding.PulsarTransportProtocol",
+ "brokerHostname": "pulsar://localhost:6650",
+ "elementId": "sp:transportprotocol:lvMQWc",
+ "topicDefinition": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+ "brokerHostname": null,
+ "elementId": "sp:transportprotocol:EiyDHT",
+ "topicDefinition": null,
+ "acks": null,
+ "batchSize": null,
+ "groupId": null,
+ "kafkaPort": 0,
+ "lingerMs": null,
+ "maxRequestSize": null,
+ "messageMaxBytes": null,
+ "offset": null,
+ "zookeeperHost": null,
+ "zookeeperPort": 0
+ }
+ ]
+ },
+ "uncompleted": false,
+ "category": [
+ "BOOLEAN_OPERATOR"
+ ],
+ "outputStrategies": [
+ {
+ "@class": "org.apache.streampipes.model.output.KeepOutputStrategy",
+ "name": null,
+ "renameRules": [],
+ "eventName": null,
+ "keepBoth": false
+ }
+ ],
+ "outputStream": {
+ "@class": "org.apache.streampipes.model.SpDataStream",
+ "appId": "urn:streampipes.apache.org:eventstream:zPxOLe",
+ "connectedTo": null,
+ "description": null,
+ "dom": null,
+ "elementId": "urn:streampipes.apache.org:eventstream:zPxOLe",
+ "includedAssets": [],
+ "includedLocales": [],
+ "includesAssets": false,
+ "includesLocales": false,
+ "internallyManaged": false,
+ "name": null,
+ "rev": null,
+ "category": null,
+ "correspondingAdapterId": null,
+ "eventGrounding": {
+ "transportProtocols": [
+ {
+ "@class":
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+ "brokerHostname": "kafka",
+ "elementId": null,
+ "topicDefinition": {
+ "@class":
"org.apache.streampipes.model.grounding.SimpleTopicDefinition",
+ "actualTopicName":
"org-apache-streampipes-internal-prbfXSLBvVHpoeX"
+ },
+ "acks": null,
+ "batchSize": null,
+ "groupId": null,
+ "kafkaPort": 9092,
+ "lingerMs": null,
+ "maxRequestSize": null,
+ "messageMaxBytes": null,
+ "offset": null,
+ "zookeeperHost": "zookeeper",
+ "zookeeperPort": 2181
+ }
+ ]
+ },
+ "eventSchema": {
+ "eventProperties": [
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current density of the fluid",
+ "elementId": "sp:eventproperty:Ruvqlo",
+ "label": "Density",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "density",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current mass flow in the sensor",
+ "elementId": "sp:eventproperty:NyTjgp",
+ "label": "Mass Flow",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "mass_flow",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "The ID of the sensor",
+ "elementId": "sp:eventproperty:ThjvdL",
+ "label": "Sensor ID",
+ "propertyScope": "DIMENSION_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "sensorId",
+ "semanticType":
"https://streampipes.org/vocabulary/examples/watertank/v1/hasSensorId",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#string",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Any fault flags of the sensors",
+ "elementId": "sp:eventproperty:vxokoH",
+ "label": "Sensor Fault Flags",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "sensor_fault_flags",
+ "semanticType": "http://schema.org/Boolean",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current temperature in degrees
celsius",
+ "elementId": "sp:eventproperty:cmnXbe",
+ "label": "Temperature",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "temperature",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": "http://qudt.org/vocab/unit#DegreeCelsius",
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": {
+ "@class":
"org.apache.streampipes.model.schema.QuantitativeValue",
+ "maxValue": 100,
+ "minValue": 0,
+ "step": 0.1
+ }
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "The current timestamp value",
+ "elementId": "sp:eventproperty:AYjMkj",
+ "label": "Timestamp",
+ "propertyScope": "HEADER_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "timestamp",
+ "semanticType": "http://schema.org/DateTime",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#long",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current volume flow",
+ "elementId": "sp:eventproperty:ORzRsw",
+ "label": "Volume Flow",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "volume_flow",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ }
+ ]
+ },
+ "index": 0
+ }
+ }
+ ],
+ "streams": [
+ {
+ "@class": "org.apache.streampipes.model.SpDataStream",
+ "appId": "e2e-adapter-id",
+ "connectedTo": null,
+ "description": "",
+ "dom": "jsplumb_0_x6Gz",
+ "elementId": "e2e-adapter-id",
+ "includedAssets": [],
+ "includedLocales": [],
+ "includesAssets": false,
+ "includesLocales": false,
+ "internallyManaged": true,
+ "name": "e2e-adapter-name",
+ "rev": "e2e-stream-rev",
+ "category": null,
+ "correspondingAdapterId": "sp:adapterdescription:OiTAHK",
+ "eventGrounding": {
+ "transportProtocols": [
+ {
+ "@class":
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+ "brokerHostname": "kafka",
+ "elementId": "sp:transportprotocol:KoHuzT",
+ "topicDefinition": {
+ "@class":
"org.apache.streampipes.model.grounding.SimpleTopicDefinition",
+ "actualTopicName": "e2e-adapter-out-topic-name"
+ },
+ "acks": null,
+ "batchSize": null,
+ "groupId": null,
+ "kafkaPort": 9092,
+ "lingerMs": null,
+ "maxRequestSize": null,
+ "messageMaxBytes": null,
+ "offset": null,
+ "zookeeperHost": null,
+ "zookeeperPort": 0
+ }
+ ]
+ },
+ "eventSchema": {
+ "eventProperties": [
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current density of the fluid",
+ "elementId": "sp:eventproperty:Ruvqlo",
+ "label": "Density",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "density",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current mass flow in the sensor",
+ "elementId": "sp:eventproperty:NyTjgp",
+ "label": "Mass Flow",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "mass_flow",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "The ID of the sensor",
+ "elementId": "sp:eventproperty:ThjvdL",
+ "label": "Sensor ID",
+ "propertyScope": "DIMENSION_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "sensorId",
+ "semanticType":
"https://streampipes.org/vocabulary/examples/watertank/v1/hasSensorId",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#string",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Any fault flags of the sensors",
+ "elementId": "sp:eventproperty:vxokoH",
+ "label": "Sensor Fault Flags",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "sensor_fault_flags",
+ "semanticType": "http://schema.org/Boolean",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current temperature in degrees
celsius",
+ "elementId": "sp:eventproperty:cmnXbe",
+ "label": "Temperature",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "temperature",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": "http://qudt.org/vocab/unit#DegreeCelsius",
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": {
+ "@class":
"org.apache.streampipes.model.schema.QuantitativeValue",
+ "maxValue": 100,
+ "minValue": 0,
+ "step": 0.1
+ }
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "The current timestamp value",
+ "elementId": "sp:eventproperty:AYjMkj",
+ "label": "Timestamp",
+ "propertyScope": "HEADER_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "timestamp",
+ "semanticType": "http://schema.org/DateTime",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#long",
+ "valueSpecification": null
+ },
+ {
+ "@class":
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+ "additionalMetadata": {},
+ "description": "Denotes the current volume flow",
+ "elementId": "sp:eventproperty:ORzRsw",
+ "label": "Volume Flow",
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "runtimeId": null,
+ "runtimeName": "volume_flow",
+ "semanticType": "http://schema.org/Number",
+ "measurementUnit": null,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "valueSpecification": null
+ }
+ ]
+ },
+ "index": 0
+ }
+ ],
+ "valid": true
+}
\ No newline at end of file
diff --git
a/streampipes-client-e2e/go-client-e2e/utils/create_machine_apadter.go
b/streampipes-client-e2e/go-client-e2e/utils/create_machine_apadter.go
new file mode 100644
index 0000000000..2f9034c1f1
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/utils/create_machine_apadter.go
@@ -0,0 +1,37 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package utils
+
+import (
+ "io/ioutil"
+ "os"
+)
+
+func CreateData(jsonFilePath string) []byte {
+ jsonFile, err := os.Open(jsonFilePath)
+ if err != nil {
+ return nil
+ }
+ defer jsonFile.Close()
+
+ jsonData, err := ioutil.ReadAll(jsonFile)
+ if err != nil {
+ return nil
+ }
+ return jsonData
+}
diff --git a/streampipes-client-e2e/go-client-e2e/utils/streampipes_client.go
b/streampipes-client-e2e/go-client-e2e/utils/streampipes_client.go
new file mode 100644
index 0000000000..90c2b87203
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/utils/streampipes_client.go
@@ -0,0 +1,36 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package utils
+
+import (
+ "github.com/apache/streampipes/streampipes-client-go/streampipes"
+ "github.com/apache/streampipes/streampipes-client-go/streampipes/config"
+ "os"
+)
+
+func CreateStreamPipesClient() (*streampipes.StreamPipesClient, error) {
+ length := len(os.Args)
+ clientConfig := config.StreamPipesClientConfig{
+ Url: "http://" + os.Args[length-4] + ":" + os.Args[length-3],
+ Credential: config.StreamPipesApiKeyCredentials{
+ UserName: os.Args[length-1],
+ ApiKey: os.Args[length-2],
+ },
+ }
+ return streampipes.NewStreamPipesClient(clientConfig)
+}
diff --git a/streampipes-client-e2e/tool/go-client-e2e.sh
b/streampipes-client-e2e/tool/go-client-e2e.sh
new file mode 100644
index 0000000000..3b52e89e2e
--- /dev/null
+++ b/streampipes-client-e2e/tool/go-client-e2e.sh
@@ -0,0 +1,61 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set environment variables
+HOST=""
+PORT=""
+APIKEY=""
+API_KEY_USER_NAME=""
+
+while true; do
+ case "$1" in
+ -h)
+ HOST="$2"
+ shift 2
+ ;;
+ -p)
+ PORT="$2"
+ shift 2
+ ;;
+ -u)
+ API_KEY_USER_NAME="$2"
+ shift 2
+ ;;
+ -k)
+ APIKEY="$2"
+ shift 2
+ ;;
+ "")
+ #skip directly
+ break
+ ;;
+ *)
+ #skip directly
+ shift
+ ;;
+ esac
+done
+
+cd ../go-client-e2e || exit
+go test -v ../go-client-e2e/... -args "$HOST" "$PORT" "$APIKEY"
"$API_KEY_USER_NAME"
+if [ $? -ne 0 ]; then
+ cd ../tool
+ echo "Error: go test failed"
+ exit 1
+fi
+cd ../tool || exit
+echo "All tests passed successfully"
\ No newline at end of file
diff --git a/streampipes-client-e2e/tool/install-element.sh
b/streampipes-client-e2e/tool/install-element.sh
new file mode 100644
index 0000000000..2b3c340f7a
--- /dev/null
+++ b/streampipes-client-e2e/tool/install-element.sh
@@ -0,0 +1,113 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+INSTALL_ELEMENT_URL="/streampipes-backend/api/v2/extension-installation"
+
+HOST="127.0.0.1"
+PORT="8030"
+TOKEN=""
+
+
+while true; do
+ case "$1" in
+ -host)
+ HOST="$2"
+ shift 2
+ ;;
+ -port)
+ PORT="$2"
+ shift 2
+ ;;
+ -token)
+ TOKEN="$2"
+ shift 2
+ ;;
+ --help)
+ echo "Usage: $0 [-host <ip>] [-port <port>] [-token <username>]"
+ exit 0
+ ;;
+ "")
+ #skip directly
+ break
+ ;;
+ *)
+ #skip directly
+ shift
+ ;;
+ esac
+done
+
+
+######################Adapters######################
+
+#machine
+installRequestBody='{
+ "appId":"org.apache.streampipes.connect.iiot.adapters.simulator.machine",
+ "publicElement":true,
+ "serviceTagPrefix":"ADAPTER"
+ }'
+
+response=$(curl -s -X POST "http://$HOST:$PORT$INSTALL_ELEMENT_URL" \
+ -H "Content-Type: application/json" \
+ -H "authorization: Bearer $TOKEN" \
+ -d "$installRequestBody")
+if [ $? -ne 0 ]; then
+ echo "$response"
+ echo "Error install machine adapter"
+ exit 1
+fi
+
+
+
+######################processor######################
+
+#bool_inverter
+installRequestBody='{
+
"appId":"org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
+ "publicElement":true,
+ "serviceTagPrefix":"DATA_PROCESSOR"
+ }'
+
+response=$(curl -s -X POST "http://$HOST:$PORT$INSTALL_ELEMENT_URL" \
+ -H "Content-Type: application/json" \
+ -H "authorization: Bearer $TOKEN" \
+ -d "$installRequestBody")
+if [ $? -ne 0 ]; then
+ echo "$response"
+ echo "Error install bool inverter processor"
+ exit 1
+fi
+
+
+######################sink######################
+#datalake
+installRequestBody='{
+ "appId":"org.apache.streampipes.sinks.internal.jvm.datalake",
+ "publicElement":true,
+ "serviceTagPrefix":"DATA_SINK"
+ }'
+
+response=$(curl -s -X POST "http://$HOST:$PORT$INSTALL_ELEMENT_URL" \
+ -H "Content-Type: application/json" \
+ -H "authorization: Bearer $TOKEN" \
+ -d "$installRequestBody")
+if [ $? -ne 0 ]; then
+ echo "$response"
+ echo "Error install datalake"
+ exit 1
+fi
\ No newline at end of file
diff --git a/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
b/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
new file mode 100644
index 0000000000..cbab40355f
--- /dev/null
+++ b/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
@@ -0,0 +1,148 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set environment variables
+E2E_TEST=""
+HOST="127.0.0.1"
+PORT="8030"
+LOGIN_URL="/streampipes-backend/api/v2/auth/login"
+
+SP_USERNAME="[email protected]"
+SP_PASSWORD="admin"
+
+while true; do
+ case "$1" in
+ -u)
+ SP_USERNAME="$2"
+ shift 2
+ ;;
+ -pw)
+ SP_PASSWORD="$2"
+ shift 2
+ ;;
+ -h)
+ HOST="$2"
+ shift 2
+ ;;
+ -p)
+ PORT="$2"
+ shift 2
+ ;;
+ -t)
+ E2E_TEST="$2"
+ shift 2
+ ;;
+ --help)
+ echo "Usage: $0 [-h <ip>] [-p <port>] [-u <username>] [-pw
<password>] [-t <E2E_TEST>]"
+ exit 0
+ ;;
+ "")
+ #skip directly
+ break
+ ;;
+ *)
+ #skip directly
+ shift
+ ;;
+ esac
+done
+
+if [ E2E_TEST == "" ]; then
+ echo "-t is empty"
+ exit 1
+fi
+
+API_KEY_URL="/streampipes-backend/api/v2/users/$SP_USERNAME/tokens"
+API_KEY_USER_NAME="$SP_USERNAME"
+
+loginRequestBody='{
+ "username": "'"$SP_USERNAME"'",
+ "password": "'"$SP_PASSWORD"'"
+}'
+
+if nc -zv localhost 8030; then
+ echo "Port 8030 is open and listening."
+else
+ echo "Port 8030 is not open or not listening."
+ exit 1
+fi
+
+echo "start login"
+max_attempts=60
+attempt=1
+while [ $attempt -le $max_attempts ]
+do
+ response=$(curl -s -X POST "http://$HOST:$PORT$LOGIN_URL" \
+ -H "Content-Type: application/json" \
+ -d "$loginRequestBody")
+ if [ $? -eq 0 ]; then
+ echo "Login successful"
+ break
+ else
+ echo "$response"
+ echo "Error: Login request failed on attempt $attempt"
+ if [ $attempt -eq $max_attempts ]; then
+ echo "Max attempts reached. Exiting."
+ exit 0
+ else
+ echo "Retrying in 1 second..."
+ sleep 1
+ fi
+ fi
+ attempt=$((attempt+1))
+done
+
+echo "get token"
+accessToken=$(echo "$response" | sed -n 's/.*"accessToken":"\([^"]*\)".*/\1/p')
+if [ -z "$accessToken" ]; then
+ echo "Error: Failed to retrieve access token"
+ exit 1
+fi
+
+apiKeyRequestBody='{
+ "tokenName": "'"$API_KEY_USER_NAME"'"
+}'
+
+echo "install element"
+chmod +x ./install-element.sh
+./install-element.sh -host "$HOST" -port "$PORT" -token "$accessToken"
+
+echo "get apikey"
+# Get APIKEY
+APIKEYRESP=$(curl -s -X POST "http://$HOST:$PORT$API_KEY_URL" \
+ -H "Content-Type: application/json" \
+ -H "authorization: Bearer $accessToken" \
+ -d "$apiKeyRequestBody")
+if [ $? -ne 0 ]; then
+ echo "Error: API Key request failed"
+ exit 1
+fi
+APIKEY=$(echo "$APIKEYRESP" | sed 's/.*"rawToken":"\([^"]*\)".*/\1/')
+if [ -z "$APIKEY" ]; then
+ echo "Error: Failed to retrieve API key"
+ exit 1
+fi
+
+echo "start e2e test"
+chmod +x ./"$E2E_TEST"
+./"$E2E_TEST" -h "$HOST" -p "$PORT" -u "$API_KEY_USER_NAME" -k "$APIKEY"
+if [ $? -ne 0 ]; then
+ echo "start $E2E_TEST failed"
+ exit 1
+fi
+
+echo "All tests passed successfully"
\ No newline at end of file
diff --git a/streampipes-client-go/streampipes/adapter_api.go
b/streampipes-client-go/streampipes/adapter_api.go
index 06fb42d789..d2f7eb7637 100644
--- a/streampipes-client-go/streampipes/adapter_api.go
+++ b/streampipes-client-go/streampipes/adapter_api.go
@@ -120,13 +120,9 @@ func (a *Adapter) DeleteSingleAdapter(adapterId string)
error {
return nil
}
-func (a *Adapter) CreateAdapter(adapters adapter.AdapterDescription) error {
+func (a *Adapter) CreateAdapter(adapters []byte) error {
endPointUrl := util.NewStreamPipesApiPath(a.config.Url,
"streampipes-backend/api/v2/connect/master/adapters", nil)
- body, err := serializer.NewAdapterSerializer().Marshal(adapters)
- if err != nil {
- return err
- }
- response, err := a.executeRequest("PUT", endPointUrl, body)
+ response, err := a.executeRequest("POST", endPointUrl, adapters)
if err != nil {
return err
}
@@ -158,8 +154,7 @@ func (a *Adapter) StopSingleAdapter(adapterId string) error
{
}
func (a *Adapter) StartSingleAdapter(adapterId string) error {
- endPointUrl := util.NewStreamPipesApiPath(a.config.Url,
"streampipes-backend/api/v2/pipelines", []string{adapterId, "start"})
-
+ endPointUrl := util.NewStreamPipesApiPath(a.config.Url,
"streampipes-backend/api/v2/connect/master/adapters", []string{adapterId,
"start"})
response, err := a.executeRequest("POST", endPointUrl, nil)
if err != nil {
return err
diff --git
a/streampipes-client-go/streampipes/internal/serializer/deserializer.go
b/streampipes-client-go/streampipes/internal/serializer/deserializer.go
index 9c47fd3383..27283c85b4 100644
--- a/streampipes-client-go/streampipes/internal/serializer/deserializer.go
+++ b/streampipes-client-go/streampipes/internal/serializer/deserializer.go
@@ -20,6 +20,8 @@ package serializer
import (
"encoding/json"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/adapter"
+ "log"
+ "strings"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/data_lake"
@@ -45,8 +47,10 @@ func NewDataLakeMeasuresDeserializer()
*DataLakeMeasuresDeserializer {
func (d DataLakeMeasuresDeserializer) Unmarshal(data []byte) (interface{},
error) {
var dataLakeMeasures []data_lake.DataLakeMeasure
- err := json.Unmarshal(data, &dataLakeMeasures)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&dataLakeMeasures); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return dataLakeMeasures, nil
@@ -60,8 +64,10 @@ func NewDataLakeMeasureDeserializer()
*DataLakeMeasureDeserializer {
func (d DataLakeMeasureDeserializer) Unmarshal(data []byte) (interface{},
error) {
var dataLakeMeasure data_lake.DataLakeMeasure
- err := json.Unmarshal(data, &dataLakeMeasure)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&dataLakeMeasure); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return dataLakeMeasure, nil
@@ -75,8 +81,10 @@ func NewDataSeriesDeserializer() *DataSeriesDeserializer {
func (d DataSeriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
var dataSeries data_lake.DataSeries
- err := json.Unmarshal(data, &dataSeries)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&dataSeries); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return dataSeries, nil
@@ -89,12 +97,14 @@ func NewStreamPipesVersionDeserializer()
*StreamPipesVersionDeserializer {
}
func (d StreamPipesVersionDeserializer) Unmarshal(data []byte) (interface{},
error) {
- var dataSeries streampipes_version.Versions
- err := json.Unmarshal(data, &dataSeries)
- if err != nil {
+ var version streampipes_version.Versions
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&version); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
- return dataSeries, nil
+ return version, nil
}
type ResponseMessageDeserializer struct{}
@@ -105,8 +115,10 @@ func NewResponseMessageDeserializer()
*ResponseMessageDeserializer {
func (r ResponseMessageDeserializer) Unmarshal(data []byte) (interface{},
error) {
var responseMessage model.ResponseMessage
- err := json.Unmarshal(data, &responseMessage)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&responseMessage); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return responseMessage, nil
@@ -120,10 +132,15 @@ func NewPipelineCategoriesDeserializer()
*PipelineCategoriesDeserializer {
func (p PipelineCategoriesDeserializer) Unmarshal(data []byte) (interface{},
error) {
var pipelineCategory []pipeline.PipelineCategory
- err := json.Unmarshal(data, &pipelineCategory)
- if err != nil {
+
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+
+ if err := dec.Decode(&pipelineCategory); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Fatal(err)
return nil, err
}
+
return pipelineCategory, nil
}
@@ -135,8 +152,10 @@ func NewDataLakeDashboardDeserializer()
*DataLakeDashboardDeserializer {
func (d DataLakeDashboardDeserializer) Unmarshal(data []byte) (interface{},
error) {
var dashborad data_lake.Dashboard
- err := json.Unmarshal(data, &dashborad)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&dashborad); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return dashborad, nil
@@ -149,13 +168,15 @@ func NewDataLakeDashboardsDeserializer()
*DataLakeDashboardsDeserializer {
}
func (d DataLakeDashboardsDeserializer) Unmarshal(data []byte) (interface{},
error) {
- var dashborad []data_lake.Dashboard
- err := json.Unmarshal(data, &dashborad)
- if err != nil {
+ var dashborads []data_lake.Dashboard
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&dashborads); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
- return dashborad, nil
+ return dashborads, nil
}
type DataLakeWidgetDeserializer struct{}
@@ -166,8 +187,10 @@ func NewDataLakeWidgetDeserializer()
*DataLakeWidgetDeserializer {
func (d DataLakeWidgetDeserializer) Unmarshal(data []byte) (interface{},
error) {
var widget data_lake.DataExplorerWidgetModel
- err := json.Unmarshal(data, &widget)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&widget); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return widget, nil
@@ -180,12 +203,14 @@ func NewDataLakeWidgetsDeserializer()
*DataLakeWidgetsDeserializer {
}
func (d DataLakeWidgetsDeserializer) Unmarshal(data []byte) (interface{},
error) {
- var widget []data_lake.DataExplorerWidgetModel
- err := json.Unmarshal(data, &widget)
- if err != nil {
+ var widgets []data_lake.DataExplorerWidgetModel
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&widgets); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
- return widget, nil
+ return widgets, nil
}
type SpLogEntriesDeserializer struct{}
@@ -194,13 +219,15 @@ func NewSpLogEntriesDeserializer()
*SpLogEntriesDeserializer {
return &SpLogEntriesDeserializer{}
}
-func (p SpLogEntriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
- var userAccount []functions.SpLogEntry
- err := json.Unmarshal(data, &userAccount)
- if err != nil {
+func (s SpLogEntriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
+ var spLogEntry []functions.SpLogEntry
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&spLogEntry); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
- return userAccount, nil
+ return spLogEntry, nil
}
type SpMetricsEntryDeserializer struct{}
@@ -209,10 +236,12 @@ func NewSpMetricsEntryDeserializer()
*SpMetricsEntryDeserializer {
return &SpMetricsEntryDeserializer{}
}
-func (p SpMetricsEntryDeserializer) Unmarshal(data []byte) (interface{},
error) {
+func (s SpMetricsEntryDeserializer) Unmarshal(data []byte) (interface{},
error) {
var spMetricsEntry functions.SpMetricsEntry
- err := json.Unmarshal(data, &spMetricsEntry)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&spMetricsEntry); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return spMetricsEntry, nil
@@ -224,10 +253,12 @@ func NewFunctionDefinitionsDeserializer()
*FunctionDefinitionsDeserializer {
return &FunctionDefinitionsDeserializer{}
}
-func (p FunctionDefinitionsDeserializer) Unmarshal(data []byte) (interface{},
error) {
+func (f FunctionDefinitionsDeserializer) Unmarshal(data []byte) (interface{},
error) {
var functionDefinitions []functions.FunctionDefinition
- err := json.Unmarshal(data, &functionDefinitions)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&functionDefinitions); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return functionDefinitions, nil
@@ -242,8 +273,10 @@ func NewShortUserInfosDeserializer()
*ShortUserInfosDeserializer {
func (s ShortUserInfosDeserializer) Unmarshal(data []byte) (interface{},
error) {
var shortUserInfo []streampipes_user.ShortUserInfo
- err := json.Unmarshal(data, &shortUserInfo)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&shortUserInfo); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return shortUserInfo, nil
@@ -257,8 +290,10 @@ func NewUserAccountDeserializer() *UserAccountDeserializer
{
func (p UserAccountDeserializer) Unmarshal(data []byte) (interface{}, error) {
var userAccount streampipes_user.UserAccount
- err := json.Unmarshal(data, &userAccount)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&userAccount); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return userAccount, nil
@@ -273,10 +308,13 @@ func NewPipelineDeserializer() *PipelineDeserializer {
func (p PipelineDeserializer) Unmarshal(data []byte) (interface{}, error) {
var pipeLine pipeline.Pipeline
- err := json.Unmarshal(data, &pipeLine)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&pipeLine); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
+
return pipeLine, nil
}
@@ -289,8 +327,11 @@ func NewPipelinesDeserializer() *PipelinesDeserializer {
func (p PipelinesDeserializer) Unmarshal(data []byte) (interface{}, error) {
var pipelines []pipeline.Pipeline
- err := json.Unmarshal(data, &pipelines)
- if err != nil {
+
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&pipelines); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return pipelines, nil
@@ -305,8 +346,10 @@ func NewPipelineStatusMessagesDeserializer()
*PipelineStatusMessagesDeserializer
func (p PipelineStatusMessagesDeserializer) Unmarshal(data []byte)
(interface{}, error) {
var pipelineStatusMessage []pipeline.PipelineStatusMessage
- err := json.Unmarshal(data, &pipelineStatusMessage)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&pipelineStatusMessage); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return pipelineStatusMessage, nil
@@ -320,8 +363,10 @@ func NewPipelineOperationStatusDeserializer()
*PipelineOperationStatusDeserializ
func (p PipelineOperationStatusDeserializer) Unmarshal(data []byte)
(interface{}, error) {
var pipelineOperationStatus pipeline.PipelineOperationStatus
- err := json.Unmarshal(data, &pipelineOperationStatus)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&pipelineOperationStatus); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Println(err)
return nil, err
}
return pipelineOperationStatus, nil
@@ -335,8 +380,11 @@ func NewAdapterDeserializer() *AdapterDeserializer {
func (a AdapterDeserializer) Unmarshal(data []byte) (interface{}, error) {
var adapterDescription adapter.AdapterDescription
- err := json.Unmarshal(data, &adapterDescription)
- if err != nil {
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+
+ if err := dec.Decode(&adapterDescription); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Fatal(err)
return nil, err
}
return adapterDescription, nil
@@ -351,8 +399,12 @@ func NewAdaptersDeserializer() *AdaptersDeserializer {
func (a AdaptersDeserializer) Unmarshal(data []byte) (interface{}, error) {
var adapters []adapter.AdapterDescription
- err := json.Unmarshal(data, &adapters)
- if err != nil {
+
+ dec := json.NewDecoder(strings.NewReader(string(data)))
+ dec.DisallowUnknownFields()
+
+ if err := dec.Decode(&adapters); err != nil &&
!strings.Contains(err.Error(), "unknown field") {
+ log.Fatal(err)
return nil, err
}
return adapters, nil
diff --git
a/streampipes-client-go/streampipes/model/adapter/adapter_description.go
b/streampipes-client-go/streampipes/model/adapter/adapter_description.go
index 4b856d32fa..7180442721 100644
--- a/streampipes-client-go/streampipes/model/adapter/adapter_description.go
+++ b/streampipes-client-go/streampipes/model/adapter/adapter_description.go
@@ -20,6 +20,7 @@ package adapter
import "github.com/apache/streampipes/streampipes-client-go/streampipes/model"
type AdapterDescription struct {
+ Class string
`json:"@class"`
ElementID string
`json:"elementId"`
Rev string
`json:"rev"`
DOM string
`json:"dom"`
diff --git a/streampipes-client-go/streampipes/model/common.go
b/streampipes-client-go/streampipes/model/common.go
index d5a81d684b..b353308556 100644
--- a/streampipes-client-go/streampipes/model/common.go
+++ b/streampipes-client-go/streampipes/model/common.go
@@ -86,6 +86,51 @@ type StaticProperty struct {
Class string `json:"@class"`
}
+type MappingProperty struct {
+ StaticProperty
+ RequirementSelector string
+ MapsFromOptions []string
+ PropertyScope string
+}
+
+type MappingPropertyUnary struct {
+ MappingProperty
+ SelectedProperty string
+}
+
+type FreeTextStaticProperty struct {
+ StaticProperty
+ value string
+ MapsTo string
+ MultiLine bool
+ HtmlAllowed bool
+ HtmlFontFormat bool
+ PlaceholdersSupported bool
+}
+
+type SelectionStaticProperty struct {
+ HorizontalRendering bool
+}
+
+type OneOfStaticProperty struct {
+ SelectionStaticProperty
+}
+
+type AnyStaticProperty struct {
+ SelectionStaticProperty
+}
+
+type RuntimeResolvableAnyStaticProperty struct {
+ AnyStaticProperty
+ DependsOn []string
+}
+
+type SlideToggleStaticProperty struct {
+ StaticProperty
+ Selected bool
+ DefaultValue bool
+}
+
type SpDataStream struct {
ElementId string `json:"elementId"`
Dom string `json:"dom"`
diff --git a/streampipes-client-go/streampipes/model/pipeline/pipeline.go
b/streampipes-client-go/streampipes/model/pipeline/pipeline.go
index 0586be659d..65273cd227 100644
--- a/streampipes-client-go/streampipes/model/pipeline/pipeline.go
+++ b/streampipes-client-go/streampipes/model/pipeline/pipeline.go
@@ -30,23 +30,23 @@ const (
)
type Pipeline struct {
- Sepas []DataProcessorInvocation //`json:"sepas"`
- Streams []model.SpDataStream //`json:"streams"`
- Name string //`json:"name"`
- Description string
//`json:"description,omitempty"`
- Actions []DataSinkInvocation //`json:"actions"`
- Running bool //`json:"running"`
- RestartOnSystemReboot bool
//`json:"restartOnSystemReboot"`
- Valid bool //`json:"valid"`
- StartedAt int64
//`json:"startedAt,omitempty"`
- CreatedAt int64 //`json:"createdAt"`
- PublicElement bool //`json:"publicElement"`
- CreatedByUser string //`json:"createdByUser"`
- PipelineCategories []string
//`json:"pipelineCategories"`
- PipelineNotifications []string
//`json:"pipelineNotifications"`
- HealthStatus PipelineHealthStatus //`json:"healthStatus"`
- ID string //`json:"_id,omitempty"`
- Rev string
//`json:"_rev,omitempty"`
+ Sepas []DataProcessorInvocation `json:"sepas"`
+ Streams []model.SpDataStream `json:"streams"`
+ Name string `json:"name"`
+ Description string
`json:"description,omitempty"`
+ Actions []DataSinkInvocation `json:"actions"`
+ Running bool `json:"running"`
+ RestartOnSystemReboot bool
`json:"restartOnSystemReboot"`
+ Valid bool `json:"valid"`
+ StartedAt int64
`json:"startedAt,omitempty"`
+ CreatedAt int64 `json:"createdAt"`
+ PublicElement bool `json:"publicElement"`
+ CreatedByUser string `json:"createdByUser"`
+ PipelineCategories []string
`json:"pipelineCategories"`
+ PipelineNotifications []string
`json:"pipelineNotifications"`
+ HealthStatus PipelineHealthStatus `json:"healthStatus"`
+ PipelineId string `json:"_id"`
+ Rev string `json:"_rev"`
}
type DataProcessorInvocation struct {
diff --git a/streampipes-client-go/streampipes/pipeline_api.go
b/streampipes-client-go/streampipes/pipeline_api.go
index e58ccc7096..f507134685 100644
--- a/streampipes-client-go/streampipes/pipeline_api.go
+++ b/streampipes-client-go/streampipes/pipeline_api.go
@@ -18,15 +18,14 @@
package streampipes
import (
- "fmt"
- "io"
- "log"
- "net/http"
"github.com/apache/streampipes/streampipes-client-go/streampipes/config"
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/serializer"
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/util"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/pipeline"
+ "io"
+ "log"
+ "net/http"
)
type Pipeline struct {
@@ -156,13 +155,9 @@ func (p *Pipeline) GetAllPipeline() ([]pipeline.Pipeline,
error) {
}
// CreatePipeline store a new pipeline
-func (p *Pipeline) CreatePipeline(pp pipeline.Pipeline)
(model.ResponseMessage, error) {
+func (p *Pipeline) CreatePipeline(body []byte) (model.ResponseMessage, error) {
endPointUrl := util.NewStreamPipesApiPath(p.config.Url,
"streampipes-backend/api/v2/pipelines", nil)
- body, err := serializer.NewPipelineSerializer().Marshal(pp)
- if err != nil {
- return model.ResponseMessage{}, err
- }
response, err := p.executeRequest("POST", endPointUrl, body)
if err != nil {
return model.ResponseMessage{}, err
@@ -180,7 +175,6 @@ func (p *Pipeline) CreatePipeline(pp pipeline.Pipeline)
(model.ResponseMessage,
unmarshalData, err :=
serializer.NewResponseMessageDeserializer().Unmarshal(data)
if err != nil {
- fmt.Println(err, 11)
return model.ResponseMessage{}, err
}
message := unmarshalData.(model.ResponseMessage)