This is an automated email from the ASF dual-hosted git repository.

zhenyu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 46de302e90 feat: implement the e2e testing framework of go-client 
(#3261)
46de302e90 is described below

commit 46de302e905a556a3e8682d5845ea92552318dc7
Author: wenyu <[email protected]>
AuthorDate: Wed Oct 30 20:04:08 2024 +0800

    feat: implement the e2e testing framework of go-client (#3261)
    
    * feat: Implement the e2e testing framework of go-client
    
    * feat: Implement the e2e testing framework of go-client
    
    * feat: Implement the e2e testing framework of go-client
    
    * feat: Implement the e2e testing framework of go-client
    
    * feat: supplement the model
    
    * fix:unmarshal fail
    
    * fix:ci fail
    
    * fix:ci fail
    
    * fix:ci fail
    
    * fix:ci fail
    
    * fix:ci fail
    
    * fix:ci fail
    
    * fix:ci fail
    
    * fix:ci fail
    
    * fix:ci fail
    
    * fix:ci fail
    
    * fix:ci fail
    
    * feat : implement datalake_test
    
    * fix : unmarshal fail
---
 .github/workflows/go-client-e2e-test.yml           |  66 ++
 streampipes-client-e2e/README.md                   |  46 +
 streampipes-client-e2e/docker-compose.yml          | 135 +++
 .../go-client-e2e/adapter/machine.json             | 225 +++++
 .../go-client-e2e/adapter_test.go                  | 134 +++
 .../go-client-e2e/datalake_test.go                 |  52 ++
 streampipes-client-e2e/go-client-e2e/go.mod        |  23 +
 .../go-client-e2e/pipeline_test.go                 | 138 +++
 .../go-client-e2e/pipelines/pipelines.json         | 960 +++++++++++++++++++++
 .../go-client-e2e/utils/create_machine_apadter.go  |  37 +
 .../go-client-e2e/utils/streampipes_client.go      |  36 +
 streampipes-client-e2e/tool/go-client-e2e.sh       |  61 ++
 streampipes-client-e2e/tool/install-element.sh     | 113 +++
 .../tool/start-streampipes-client-e2e.sh           | 148 ++++
 streampipes-client-go/streampipes/adapter_api.go   |  11 +-
 .../internal/serializer/deserializer.go            | 158 ++--
 .../model/adapter/adapter_description.go           |   1 +
 streampipes-client-go/streampipes/model/common.go  |  45 +
 .../streampipes/model/pipeline/pipeline.go         |  34 +-
 streampipes-client-go/streampipes/pipeline_api.go  |  14 +-
 20 files changed, 2349 insertions(+), 88 deletions(-)

diff --git a/.github/workflows/go-client-e2e-test.yml 
b/.github/workflows/go-client-e2e-test.yml
new file mode 100644
index 0000000000..4adec93e16
--- /dev/null
+++ b/.github/workflows/go-client-e2e-test.yml
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: run-e2e-tests
+
+on:
+  pull_request:
+
+jobs:
+  build-and-push-to-docker:
+    runs-on: ubuntu-latest
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v4
+
+      - name: Set up Docker Buildx
+        uses: docker/setup-buildx-action@v1
+
+      - name: Cache Docker layers
+        uses: actions/cache@v2
+        with:
+          path: /tmp/.buildx-cache
+          key: ${{ runner.os }}-buildx-${{ github.sha }}
+          restore-keys: |
+            ${{ runner.os }}-buildx-
+
+      - name: Set up Go 1.21
+        uses: actions/setup-go@v3
+        with:
+          go-version: '1.21'
+
+      - name: Set up JDK 17
+        uses: actions/setup-java@v4
+        with:
+          distribution: 'temurin'
+          java-version: '17'
+          cache: 'maven'
+
+      - name: Build with Maven
+        run: mvn clean install
+
+      - name: Build Docker image
+        run: |
+          docker build -t 
streampipes_pipeline-elements-all-jvm:release-validation 
./streampipes-extensions/streampipes-extensions-all-jvm
+          docker build -t streampipes_backend:release-validation 
./streampipes-service-core
+
+      - name: Start streampipes
+        run: docker compose -f ./streampipes-client-e2e/docker-compose.yml -p 
streampipes-client-e2e up -d
+
+      - name: Start go client test
+        run: |
+          cd ./streampipes-client-e2e/tool
+          chmod +x ./start-streampipes-client-e2e.sh
+          ./start-streampipes-client-e2e.sh -t go-client-e2e.sh
\ No newline at end of file
diff --git a/streampipes-client-e2e/README.md b/streampipes-client-e2e/README.md
new file mode 100644
index 0000000000..dcd05bc21b
--- /dev/null
+++ b/streampipes-client-e2e/README.md
@@ -0,0 +1,46 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+# Client E2E
+
+## Environment Setup
+Before running, ensure that StreamPipe is operational and the corresponding 
configuration is input into the `start-streampipes-client-e2e.sh` script.
+**Note**: This script must be executed from within the [tool](./tool) 
directory.
+```shell
+./start-streampipes-client-e2e.sh -h 127.0.0.1 -p 8030 -u 
[email protected]   -pw admin -t go-client-e2e.sh
+```
+
+## Usage Instructions
+| Parameter | Default          | Required | Description                        
                                                                       |
+|-----------|-------------------|----------|-----------------------------------------------------------------------------------------------------------|
+| `-h`      | `127.0.0.1`       | No       | Host address                      
                                                                        |
+| `-p`      | `8030`            | No       | Backend port                      
                                                                        |
+| `-u`      | `[email protected]` | No       | User                 
                                                                                
     |
+| `-pw`     | `admin`           | No       | Password                          
                                                                        |
+| `-t`      | `""`              | Yes      | Name of the client's E2E test 
startup script (the script must be located in the [tool](./tool) directory) |
+
+## How to add E2E in a new language
+1. If you need to define an E2E test in a new language, you need to download 
the element you need to use in `install-element.sh`.
+2. To accommodate different languages, you need to write an E2E startup 
script. You can refer to `go-client-e2e.sh`. The startup script will receive 
four parameters, so it can easily obtain the information needed to run the 
Client.
+
+| Parameter | Description |
+|-----------|-------------|
+| `-h`      | Host        |
+| `-p`      | Backend port|
+| `-u`      | User        |
+| `-k`      | API Key     |
\ No newline at end of file
diff --git a/streampipes-client-e2e/docker-compose.yml 
b/streampipes-client-e2e/docker-compose.yml
new file mode 100644
index 0000000000..09e2c9baa5
--- /dev/null
+++ b/streampipes-client-e2e/docker-compose.yml
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "3.8"
+
+# global logging
+x-logging: &default-logging
+  options:
+    max-size: "12m"
+    max-file: "5"
+  driver: json-file
+
+services:
+  #### apache/streampipes
+  backend:
+    image: streampipes_backend:release-validation
+    ports:
+      - "8030:8030"
+    depends_on:
+      - couchdb
+    volumes:
+      - backend:/root/.streampipes
+      - files:/spImages
+    logging: *default-logging
+    networks:
+      spnet:
+
+  extensions-all-jvm:
+    image: streampipes_pipeline-elements-all-jvm:release-validation
+    volumes:
+      - files:/spImages
+    logging: *default-logging
+    networks:
+      spnet:
+
+  couchdb:
+    image: couchdb:2.3.1
+    environment:
+      - COUCHDB_USER=admin
+      - COUCHDB_PASSWORD=admin
+    logging: *default-logging
+    volumes:
+      - couchdb:/opt/couchdb/data
+    networks:
+      spnet:
+
+  kafka:
+    image: fogsyio/kafka:2.2.0
+    hostname: kafka
+    depends_on:
+      - zookeeper
+    environment:
+      # see: https://github.com/confluentinc/schema-registry/issues/648
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
+      KAFKA_LISTENERS: PLAINTEXT://:9092
+      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+      KAFKA_ADVERTISED_HOST_NAME: kafka
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+    volumes:
+      - kafka:/kafka
+      - /var/run/docker.sock:/var/run/docker.sock
+    logging: *default-logging
+    networks:
+      spnet:
+
+  zookeeper:
+    image: fogsyio/zookeeper:3.4.13
+    logging: *default-logging
+    volumes:
+      - zookeeper:/opt/zookeeper-3.4.13
+    networks:
+      spnet:
+
+  influxdb:
+    image: influxdb:2.6
+    environment:
+      - INFLUXDB_DATA_ENGINE=tsm1
+      - INFLUXDB_REPORTING_DISABLED=false
+      - INFLUXDB_ADMIN_ENABLED=true
+      - DOCKER_INFLUXDB_INIT_USERNAME=admin
+      - DOCKER_INFLUXDB_INIT_PASSWORD=sp-admin
+      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=sp-admin
+      - DOCKER_INFLUXDB_INIT_ORG=sp
+      - DOCKER_INFLUXDB_INIT_BUCKET=sp
+      - DOCKER_INFLUXDB_INIT_MODE=setup
+    volumes:
+      - influxdb:/var/lib/influxdb
+      - influxdb2:/var/lib/influxdb2
+    logging: *default-logging
+    networks:
+      spnet:
+
+  mosquitto:
+    image: eclipse-mosquitto:1.5.4
+    logging: *default-logging
+    networks:
+      spnet:
+
+  opcua:
+    image: mcr.microsoft.com/iotedge/opc-plc
+    logging: *default-logging
+    command: --ut=true
+    networks:
+      spnet:
+
+volumes:
+  kafka:
+  files:
+  couchdb:
+  zookeeper:
+  influxdb:
+  influxdb2:
+  backend:
+  nginx:
+
+networks:
+  spnet:
+    name: spnet
+    driver: bridge
+    ipam:
+      config:
+        - subnet: 172.31.0.0/16
diff --git a/streampipes-client-e2e/go-client-e2e/adapter/machine.json 
b/streampipes-client-e2e/go-client-e2e/adapter/machine.json
new file mode 100644
index 0000000000..d50a60be01
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/adapter/machine.json
@@ -0,0 +1,225 @@
+{
+  "@class": "org.apache.streampipes.model.connect.adapter.AdapterDescription",
+  "appId": "org.apache.streampipes.connect.iiot.adapters.simulator.machine",
+  "connectedTo": null,
+  "description": "",
+  "dom": null,
+  "elementId": 
"sp:org.apache.streampipes.connect.iiot.adapters.simulator.machine",
+  "includedAssets": [
+    "documentation.md",
+    "icon.png"
+  ],
+  "includedLocales": [
+    "strings.en"
+  ],
+  "includesAssets": true,
+  "includesLocales": true,
+  "internallyManaged": false,
+  "name": "test1",
+  "rev": "1-73ca816b2ed82d8232b864556c7f2d64",
+  "version": 0,
+  "category": [
+    "Debugging"
+  ],
+  "config": [
+    {
+      "@class": 
"org.apache.streampipes.model.staticproperty.FreeTextStaticProperty",
+      "description": "The time to wait between two events in milliseconds",
+      "internalName": "wait-time-ms",
+      "label": "Wait Time (MS)",
+      "optional": false,
+      "staticPropertyType": "FreeTextStaticProperty",
+      "htmlAllowed": false,
+      "htmlFontFormat": false,
+      "mapsTo": null,
+      "multiLine": false,
+      "placeholdersSupported": false,
+      "requiredDatatype": "http://www.w3.org/2001/XMLSchema#integer";,
+      "requiredDomainProperty": null,
+      "value": "1000",
+      "valueSpecification": null
+    },
+    {
+      "@class": 
"org.apache.streampipes.model.staticproperty.OneOfStaticProperty",
+      "description": "Select simulated sensor data to be published",
+      "internalName": "selected-simulator-option",
+      "label": "Select sensor",
+      "optional": false,
+      "staticPropertyType": "OneOfStaticProperty",
+      "horizontalRendering": false,
+      "options": [
+        {
+          "elementId": "sp:option:pOPYGJ",
+          "internalName": null,
+          "name": "flowrate",
+          "selected": true
+        },
+        {
+          "elementId": "sp:option:CpeHtP",
+          "internalName": null,
+          "name": "pressure",
+          "selected": false
+        },
+        {
+          "elementId": "sp:option:KYnhkL",
+          "internalName": null,
+          "name": "waterlevel",
+          "selected": false
+        }
+      ]
+    }
+  ],
+  "correspondingDataStreamElementId": null,
+  "correspondingServiceGroup": null,
+  "createdAt": 0,
+  "dataStream": {
+    "@class": "org.apache.streampipes.model.SpDataStream",
+    "appId": "urn:streampipes.apache.org:eventstream:hLNLBx",
+    "connectedTo": null,
+    "description": null,
+    "dom": null,
+    "elementId": "urn:streampipes.apache.org:eventstream:hLNLBx",
+    "includedAssets": [],
+    "includedLocales": [],
+    "includesAssets": false,
+    "includesLocales": false,
+    "internallyManaged": false,
+    "name": null,
+    "rev": null,
+    "category": null,
+    "correspondingAdapterId": null,
+    "eventGrounding": null,
+    "eventSchema": {
+      "eventProperties": [
+        {
+          "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+          "additionalMetadata": {},
+          "description": "Denotes the current density of the fluid",
+          "elementId": "sp:eventproperty:Ruvqlo",
+          "label": "Density",
+          "propertyScope": "MEASUREMENT_PROPERTY",
+          "runtimeId": null,
+          "runtimeName": "density",
+          "semanticType": "http://schema.org/Number";,
+          "measurementUnit": null,
+          "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+          "valueSpecification": null,
+          "id": 1510309651547
+        },
+        {
+          "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+          "additionalMetadata": {},
+          "description": "Denotes the current mass flow in the sensor",
+          "elementId": "sp:eventproperty:NyTjgp",
+          "label": "Mass Flow",
+          "propertyScope": "MEASUREMENT_PROPERTY",
+          "runtimeId": null,
+          "runtimeName": "mass_flow",
+          "semanticType": "http://schema.org/Number";,
+          "measurementUnit": null,
+          "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+          "valueSpecification": null,
+          "id": 4855204620884
+        },
+        {
+          "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+          "additionalMetadata": {},
+          "description": "The ID of the sensor",
+          "elementId": "sp:eventproperty:ThjvdL",
+          "label": "Sensor ID",
+          "propertyScope": "DIMENSION_PROPERTY",
+          "runtimeId": null,
+          "runtimeName": "sensorId",
+          "semanticType": 
"https://streampipes.org/vocabulary/examples/watertank/v1/hasSensorId";,
+          "measurementUnit": null,
+          "runtimeType": "http://www.w3.org/2001/XMLSchema#string";,
+          "valueSpecification": null,
+          "id": 3260657832066
+        },
+        {
+          "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+          "additionalMetadata": {},
+          "description": "Any fault flags of the sensors",
+          "elementId": "sp:eventproperty:vxokoH",
+          "label": "Sensor Fault Flags",
+          "propertyScope": "MEASUREMENT_PROPERTY",
+          "runtimeId": null,
+          "runtimeName": "sensor_fault_flags",
+          "semanticType": "http://schema.org/Boolean";,
+          "measurementUnit": null,
+          "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean";,
+          "valueSpecification": null,
+          "id": 6931662104350
+        },
+        {
+          "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+          "additionalMetadata": {},
+          "description": "Denotes the current temperature in degrees celsius",
+          "elementId": "sp:eventproperty:cmnXbe",
+          "label": "Temperature",
+          "propertyScope": "MEASUREMENT_PROPERTY",
+          "runtimeId": null,
+          "runtimeName": "temperature",
+          "semanticType": "http://schema.org/Number";,
+          "measurementUnit": "http://qudt.org/vocab/unit#DegreeCelsius";,
+          "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+          "valueSpecification": {
+            "@class": "org.apache.streampipes.model.schema.QuantitativeValue",
+            "maxValue": 100,
+            "minValue": 0,
+            "step": 0.1
+          },
+          "id": 1980836171418
+        },
+        {
+          "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+          "additionalMetadata": {},
+          "description": "The current timestamp value",
+          "elementId": "sp:eventproperty:AYjMkj",
+          "label": "Timestamp",
+          "propertyScope": "HEADER_PROPERTY",
+          "runtimeId": null,
+          "runtimeName": "timestamp",
+          "semanticType": "http://schema.org/DateTime";,
+          "measurementUnit": null,
+          "runtimeType": "http://www.w3.org/2001/XMLSchema#long";,
+          "valueSpecification": null,
+          "id": 5857972009985
+        },
+        {
+          "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+          "additionalMetadata": {},
+          "description": "Denotes the current volume flow",
+          "elementId": "sp:eventproperty:ORzRsw",
+          "label": "Volume Flow",
+          "propertyScope": "MEASUREMENT_PROPERTY",
+          "runtimeId": null,
+          "runtimeName": "volume_flow",
+          "semanticType": "http://schema.org/Number";,
+          "measurementUnit": null,
+          "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+          "valueSpecification": null,
+          "id": 64305844251
+        }
+      ]
+    },
+    "index": 0
+  },
+  "deploymentConfiguration": {
+    "desiredServiceTags": [],
+    "selectedEndpointUrl": null
+  },
+  "eventGrounding": {
+    "transportProtocols": []
+  },
+  "eventSchema": {
+    "eventProperties": []
+  },
+  "icon": null,
+  "rules": [],
+  "running": false,
+  "schemaRules": [],
+  "selectedEndpointUrl": null,
+  "streamRules": [],
+  "valueRules": []
+}
\ No newline at end of file
diff --git a/streampipes-client-e2e/go-client-e2e/adapter_test.go 
b/streampipes-client-e2e/go-client-e2e/adapter_test.go
new file mode 100644
index 0000000000..88d41fcca2
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/adapter_test.go
@@ -0,0 +1,134 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package go_client_e2e
+
+import (
+       "go-client-e2e/utils"
+       "os"
+       "testing"
+)
+
+const (
+       E2E_ADAPTER_ID             = "e2e-adapter-id"
+       E2E_ADAPTER_NAME           = "e2e-adapter-name"
+       E2E_STREAM_REV             = "e2e-stream-rev"
+       E2E_HOST_NAME              = "e2e-host-name"
+       E2E_ADAPTER_OUT_TOPIC_NAME = "e2e-adapter-out-topic-name"
+       E2E_PORT                   = "\"e2e-port\""
+)
+
+func TestCreateAdapter(t *testing.T) {
+       TestDeleteAdapter(t)
+       streamPipesClient, err := utils.CreateStreamPipesClient()
+       if err != nil {
+               t.Log(err)
+               os.Exit(1)
+       }
+       data := utils.CreateData("adapter/machine.json")
+       err = streamPipesClient.Adapter().CreateAdapter(data)
+       if err != nil {
+               t.Log(err)
+               os.Exit(1)
+       }
+}
+
+func TestGetAdapter(t *testing.T) {
+       streamPipesClient, err := utils.CreateStreamPipesClient()
+       if err != nil {
+               t.Error(err)
+               os.Exit(1)
+       }
+       adapters, err1 := streamPipesClient.Adapter().GetAllAdapter()
+       if err1 != nil {
+               t.Error(err1)
+               os.Exit(1)
+       }
+       if len(adapters) == 0 {
+               t.Log("adapter is null")
+               os.Exit(1)
+       }
+}
+
+func TestStartAdapter(t *testing.T) {
+       streamPipesClient, err := utils.CreateStreamPipesClient()
+       if err != nil {
+               t.Error(err)
+               os.Exit(1)
+       }
+       adapters, err1 := streamPipesClient.Adapter().GetAllAdapter()
+       if err1 != nil {
+               t.Error(err1)
+               os.Exit(1)
+       }
+       if len(adapters) == 0 {
+               t.Log("adapter is null")
+               os.Exit(1)
+       }
+       err = 
streamPipesClient.Adapter().StartSingleAdapter(adapters[0].ElementID)
+       if err != nil {
+               t.Error(err1)
+               os.Exit(1)
+       }
+}
+
+func TestStopAdapter(t *testing.T) {
+       streamPipesClient, err := utils.CreateStreamPipesClient()
+       if err != nil {
+               t.Error(err)
+               os.Exit(1)
+       }
+       adapters, err1 := streamPipesClient.Adapter().GetAllAdapter()
+       if err1 != nil {
+               t.Error(err1)
+               os.Exit(1)
+       }
+       for _, adapter := range adapters {
+               err = 
streamPipesClient.Adapter().StopSingleAdapter(adapter.ElementID)
+               if err != nil {
+                       t.Error(err)
+                       os.Exit(1)
+               }
+               adapter, err = 
streamPipesClient.Adapter().GetSingleAdapter(adapter.ElementID)
+               if err != nil || adapter.Running {
+                       t.Error(err)
+                       os.Exit(1)
+               }
+       }
+
+}
+
+func TestDeleteAdapter(t *testing.T) {
+       streamPipesClient, err := utils.CreateStreamPipesClient()
+       if err != nil {
+               t.Error(err)
+               os.Exit(1)
+       }
+       adapters, err1 := streamPipesClient.Adapter().GetAllAdapter()
+       if err1 != nil {
+               t.Error(err1)
+               os.Exit(1)
+       }
+       for _, adapter := range adapters {
+               err = 
streamPipesClient.Adapter().DeleteSingleAdapter(adapter.ElementID)
+               if err != nil {
+                       t.Error(err1)
+                       os.Exit(1)
+               }
+       }
+
+}
diff --git a/streampipes-client-e2e/go-client-e2e/datalake_test.go 
b/streampipes-client-e2e/go-client-e2e/datalake_test.go
new file mode 100644
index 0000000000..6168139ed6
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/datalake_test.go
@@ -0,0 +1,52 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package go_client_e2e
+
+import (
+       "go-client-e2e/utils"
+       "os"
+       "testing"
+       "time"
+)
+
+func TestGetDataLake(t *testing.T) {
+       TestCreatePipeline(t)
+       TestStartPipeline(t)
+       streamPipesClient, err := utils.CreateStreamPipesClient()
+       if err != nil {
+               t.Error(err)
+               os.Exit(1)
+       }
+       dataLake := streamPipesClient.DataLakeMeasures()
+       measures, err := dataLake.GetAllDataLakeMeasure()
+       if err != nil {
+               t.Error(err)
+               os.Exit(1)
+       }
+
+       time.Sleep(10 * time.Second)
+
+       if len(measures) == 0 {
+               TestStopPipeline(t)
+               TestDeletePipeline(t)
+               t.Error("No data lake measures found")
+               os.Exit(1)
+       }
+       TestStopPipeline(t)
+       TestDeletePipeline(t)
+}
diff --git a/streampipes-client-e2e/go-client-e2e/go.mod 
b/streampipes-client-e2e/go-client-e2e/go.mod
new file mode 100644
index 0000000000..f7c1cd0cc1
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/go.mod
@@ -0,0 +1,23 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+module go-client-e2e
+
+go 1.21.6
+
+require github.com/apache/streampipes/streampipes-client-go v0.0.0 // indirect
+replace "github.com/apache/streampipes/streampipes-client-go" => 
"../../streampipes-client-go"
\ No newline at end of file
diff --git a/streampipes-client-e2e/go-client-e2e/pipeline_test.go 
b/streampipes-client-e2e/go-client-e2e/pipeline_test.go
new file mode 100644
index 0000000000..e85fc375fe
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/pipeline_test.go
@@ -0,0 +1,138 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package go_client_e2e
+
+import (
+       "go-client-e2e/utils"
+       "os"
+       "strings"
+       "testing"
+)
+
+func TestCreatePipeline(t *testing.T) {
+       TestDeletePipeline(t)
+       TestCreateAdapter(t)
+
+       streamPipesClient, err := utils.CreateStreamPipesClient()
+       if err != nil {
+               os.Exit(1)
+       }
+       TestStartAdapter(t)
+       adapters, err := streamPipesClient.Adapter().GetAllAdapter()
+       if err != nil {
+               t.Error(err)
+               os.Exit(1)
+       }
+       if len(adapters) == 0 {
+               t.Log("adapter is null")
+               os.Exit(1)
+       }
+       adapter := adapters[0]
+
+       data := utils.CreateData("pipelines/pipelines.json")
+       pipeline := string(data)
+       pipeline = strings.Replace(pipeline, E2E_ADAPTER_ID, adapter.ElementID, 
-1)
+       pipeline = strings.Replace(pipeline, E2E_ADAPTER_NAME, adapter.Name, -1)
+       pipeline = strings.Replace(pipeline, E2E_STREAM_REV, adapter.Rev, -1)
+       pipeline = strings.Replace(pipeline, E2E_ADAPTER_OUT_TOPIC_NAME, 
adapter.EventGrounding.TransportProtocols[0].TopicDefinition.ActualTopicName, 
-1)
+
+       message, statusErr := 
streamPipesClient.Pipeline().CreatePipeline([]byte(pipeline))
+       if statusErr != nil || !message.Success {
+               t.Log(statusErr, !message.Success)
+               os.Exit(1)
+       }
+
+}
+
+func TestGetPipeline(t *testing.T) {
+       streamPipesClient, err := utils.CreateStreamPipesClient()
+       if err != nil {
+               t.Error(err)
+               os.Exit(1)
+       }
+       pipelines, err1 := streamPipesClient.Pipeline().GetAllPipeline()
+       if err1 != nil {
+               t.Error(err1)
+               os.Exit(1)
+       }
+       if len(pipelines) == 0 {
+               t.Log("pipelines is null")
+               os.Exit(1)
+       }
+}
+
+func TestStartPipeline(t *testing.T) {
+       streamPipesClient, err := utils.CreateStreamPipesClient()
+       if err != nil {
+               t.Error(err)
+               os.Exit(1)
+       }
+       pipelines, err1 := streamPipesClient.Pipeline().GetAllPipeline()
+       if err1 != nil {
+               t.Error(err1)
+               os.Exit(1)
+       }
+       if len(pipelines) == 0 {
+               t.Log("pipelines is null")
+               os.Exit(1)
+       }
+
+       status, statusErr := 
streamPipesClient.Pipeline().StartSinglePipeline(pipelines[0].PipelineId)
+       if statusErr != nil || !status.Success {
+               t.Error(status.Title)
+               os.Exit(1)
+       }
+}
+
+func TestStopPipeline(t *testing.T) {
+       streamPipesClient, err := utils.CreateStreamPipesClient()
+       if err != nil {
+               t.Error(err)
+               os.Exit(1)
+       }
+       pipelines, err1 := streamPipesClient.Pipeline().GetAllPipeline()
+       for _, pipeline := range pipelines {
+               operation, operationErr := 
streamPipesClient.Pipeline().StopSinglePipeline(pipeline.PipelineId)
+               if operationErr != nil || !operation.Success {
+                       t.Error(err1)
+                       os.Exit(1)
+               }
+       }
+
+}
+
+func TestDeletePipeline(t *testing.T) {
+       streamPipesClient, err := utils.CreateStreamPipesClient()
+       if err != nil {
+               t.Error(err)
+               os.Exit(1)
+       }
+       pipelines, err1 := streamPipesClient.Pipeline().GetAllPipeline()
+       if err1 != nil {
+               t.Error(err1)
+               os.Exit(1)
+       }
+       for _, pipeline := range pipelines {
+               err = 
streamPipesClient.Pipeline().DeleteSinglePipeline(pipeline.PipelineId)
+               if err != nil {
+                       t.Error(err1)
+                       os.Exit(1)
+               }
+       }
+       TestDeleteAdapter(t)
+}
diff --git a/streampipes-client-e2e/go-client-e2e/pipelines/pipelines.json 
b/streampipes-client-e2e/go-client-e2e/pipelines/pipelines.json
new file mode 100644
index 0000000000..3cebc73a5f
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/pipelines/pipelines.json
@@ -0,0 +1,960 @@
+{
+  "actions": [
+    {
+      "@class": "org.apache.streampipes.model.graph.DataSinkInvocation",
+      "appId": "org.apache.streampipes.sinks.internal.jvm.datalake",
+      "connectedTo": [
+        "jsplumb_1_j3BB"
+      ],
+      "description": "Stores events in the internal data lake.",
+      "dom": "jsplumb_2_Fahe",
+      "elementId": "sp:datasinkinvocation:MZnKuj:qULqX",
+      "includedAssets": [
+        "documentation.md",
+        "icon.png"
+      ],
+      "includedLocales": [
+        "strings.en"
+      ],
+      "includesAssets": true,
+      "includesLocales": true,
+      "internallyManaged": false,
+      "name": "Data Lake",
+      "rev": null,
+      "version": 2,
+      "belongsTo": "sp:org.apache.streampipes.sinks.internal.jvm.datalake",
+      "configured": true,
+      "correspondingPipeline": null,
+      "correspondingUser": null,
+      "inputStreams": [
+        {
+          "@class": "org.apache.streampipes.model.SpDataStream",
+          "appId": "urn:streampipes.apache.org:eventstream:mqYwaw",
+          "connectedTo": null,
+          "description": null,
+          "dom": null,
+          "elementId": "urn:streampipes.apache.org:eventstream:mqYwaw",
+          "includedAssets": [],
+          "includedLocales": [],
+          "includesAssets": false,
+          "includesLocales": false,
+          "internallyManaged": false,
+          "name": null,
+          "rev": null,
+          "category": null,
+          "correspondingAdapterId": null,
+          "eventGrounding": {
+            "transportProtocols": [
+              {
+                "@class": 
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+                "brokerHostname": "kafka",
+                "elementId": null,
+                "topicDefinition": {
+                  "@class": 
"org.apache.streampipes.model.grounding.SimpleTopicDefinition",
+                  "actualTopicName": 
"org-apache-streampipes-internal-prbfXSLBvVHpoeX"
+                },
+                "acks": null,
+                "batchSize": null,
+                "groupId": null,
+                "kafkaPort": 9092,
+                "lingerMs": null,
+                "maxRequestSize": null,
+                "messageMaxBytes": null,
+                "offset": null,
+                "zookeeperHost": "zookeeper",
+                "zookeeperPort": 2181
+              }
+            ]
+          },
+          "eventSchema": {
+            "eventProperties": [
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "Denotes the current density of the fluid",
+                "elementId": "sp:eventproperty:Ruvqlo",
+                "label": "Density",
+                "propertyScope": "MEASUREMENT_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "density",
+                "semanticType": "http://schema.org/Number";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+                "valueSpecification": null
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "Denotes the current mass flow in the sensor",
+                "elementId": "sp:eventproperty:NyTjgp",
+                "label": "Mass Flow",
+                "propertyScope": "MEASUREMENT_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "mass_flow",
+                "semanticType": "http://schema.org/Number";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+                "valueSpecification": null
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "Any fault flags of the sensors",
+                "elementId": "sp:eventproperty:vxokoH",
+                "label": "Sensor Fault Flags",
+                "propertyScope": "MEASUREMENT_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "sensor_fault_flags",
+                "semanticType": "http://schema.org/Boolean";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean";,
+                "valueSpecification": null
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "The ID of the sensor",
+                "elementId": "sp:eventproperty:ThjvdL",
+                "label": "Sensor ID",
+                "propertyScope": "DIMENSION_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "sensorId",
+                "semanticType": 
"https://streampipes.org/vocabulary/examples/watertank/v1/hasSensorId";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#string";,
+                "valueSpecification": null
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "Denotes the current temperature in degrees 
celsius",
+                "elementId": "sp:eventproperty:cmnXbe",
+                "label": "Temperature",
+                "propertyScope": "MEASUREMENT_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "temperature",
+                "semanticType": "http://schema.org/Number";,
+                "measurementUnit": "http://qudt.org/vocab/unit#DegreeCelsius";,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+                "valueSpecification": {
+                  "@class": 
"org.apache.streampipes.model.schema.QuantitativeValue",
+                  "maxValue": 100,
+                  "minValue": 0,
+                  "step": 0.1
+                }
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "The current timestamp value",
+                "elementId": "sp:eventproperty:AYjMkj",
+                "label": "Timestamp",
+                "propertyScope": "HEADER_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "timestamp",
+                "semanticType": "http://schema.org/DateTime";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#long";,
+                "valueSpecification": null
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "Denotes the current volume flow",
+                "elementId": "sp:eventproperty:ORzRsw",
+                "label": "Volume Flow",
+                "propertyScope": "MEASUREMENT_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "volume_flow",
+                "semanticType": "http://schema.org/Number";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+                "valueSpecification": null
+              }
+            ]
+          },
+          "index": 0
+        }
+      ],
+      "selectedEndpointUrl": null,
+      "serviceTagPrefix": "DATA_SINK",
+      "staticProperties": [
+        {
+          "@class": 
"org.apache.streampipes.model.staticproperty.MappingPropertyUnary",
+          "description": "The value which contains a timestamp",
+          "internalName": "timestamp_mapping",
+          "label": "Timestamp Field",
+          "optional": false,
+          "staticPropertyType": "MappingPropertyUnary",
+          "mapsFromOptions": [
+            "s0::timestamp"
+          ],
+          "propertyScope": "NONE",
+          "requirementSelector": "r0::timestamp_mapping",
+          "selectedProperty": "s0::timestamp"
+        },
+        {
+          "@class": 
"org.apache.streampipes.model.staticproperty.FreeTextStaticProperty",
+          "description": "The name of the identifier under which the data is 
to be stored.",
+          "internalName": "db_measurement",
+          "label": "Identifier",
+          "optional": false,
+          "staticPropertyType": "FreeTextStaticProperty",
+          "htmlAllowed": false,
+          "htmlFontFormat": false,
+          "mapsTo": null,
+          "multiLine": false,
+          "placeholdersSupported": false,
+          "requiredDatatype": "http://www.w3.org/2001/XMLSchema#string";,
+          "requiredDomainProperty": null,
+          "value": "test",
+          "valueSpecification": null
+        },
+        {
+          "@class": 
"org.apache.streampipes.model.staticproperty.OneOfStaticProperty",
+          "description": "Update existing schemas with the new one or extend 
the existing schema with new properties",
+          "internalName": "schema_update",
+          "label": "Schema Update",
+          "optional": false,
+          "staticPropertyType": "OneOfStaticProperty",
+          "horizontalRendering": false,
+          "options": [
+            {
+              "elementId": "sp:option:wzqPbk",
+              "internalName": null,
+              "name": "Update schema",
+              "selected": true
+            },
+            {
+              "elementId": "sp:option:XmVjRp",
+              "internalName": null,
+              "name": "Extend existing schema",
+              "selected": false
+            }
+          ]
+        },
+        {
+          "@class": 
"org.apache.streampipes.model.staticproperty.RuntimeResolvableAnyStaticProperty",
+          "description": "Selected fields will be stored as dimensions.",
+          "internalName": "dimensions_selection",
+          "label": "Dimensions",
+          "optional": false,
+          "staticPropertyType": "RuntimeResolvableAnyStaticProperty",
+          "horizontalRendering": false,
+          "options": [
+            {
+              "elementId": "sp:option:paBEDz",
+              "internalName": null,
+              "name": "sensor_fault_flags",
+              "selected": false
+            },
+            {
+              "elementId": "sp:option:JYOYId",
+              "internalName": null,
+              "name": "sensorId",
+              "selected": true
+            }
+          ],
+          "dependsOn": null
+        },
+        {
+          "@class": 
"org.apache.streampipes.model.staticproperty.SlideToggleStaticProperty",
+          "description": "Fields having the same value than the previous event 
are not stored. This only affects measurements, not tags.",
+          "internalName": "ignore_duplicates",
+          "label": "Ignore duplicates",
+          "optional": false,
+          "staticPropertyType": "SlideToggleStaticProperty",
+          "defaultValue": false,
+          "selected": false
+        }
+      ],
+      "streamRequirements": [
+        {
+          "@class": "org.apache.streampipes.model.SpDataStream",
+          "appId": "urn:streampipes.apache.org:eventstream:mqYwaw",
+          "connectedTo": null,
+          "description": null,
+          "dom": null,
+          "elementId": "urn:streampipes.apache.org:eventstream:mqYwaw",
+          "includedAssets": [],
+          "includedLocales": [],
+          "includesAssets": false,
+          "includesLocales": false,
+          "internallyManaged": false,
+          "name": null,
+          "rev": null,
+          "category": null,
+          "correspondingAdapterId": null,
+          "eventGrounding": null,
+          "eventSchema": {
+            "eventProperties": [
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": null,
+                "elementId": "sp:eventproperty:LrdSOF",
+                "label": null,
+                "propertyScope": null,
+                "runtimeId": null,
+                "runtimeName": "timestamp_mapping",
+                "semanticType": "http://schema.org/DateTime";,
+                "measurementUnit": null,
+                "runtimeType": null,
+                "valueSpecification": null
+              }
+            ]
+          },
+          "index": 0
+        }
+      ],
+      "supportedGrounding": {
+        "transportProtocols": [
+          {
+            "@class": 
"org.apache.streampipes.model.grounding.MqttTransportProtocol",
+            "brokerHostname": null,
+            "elementId": "sp:transportprotocol:IxejrD",
+            "topicDefinition": null,
+            "port": 0
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.grounding.NatsTransportProtocol",
+            "brokerHostname": null,
+            "elementId": "sp:transportprotocol:hMYAXY",
+            "topicDefinition": null,
+            "port": 0
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.grounding.JmsTransportProtocol",
+            "brokerHostname": null,
+            "elementId": "sp:transportprotocol:ycSaOT",
+            "topicDefinition": null,
+            "port": 0
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.grounding.PulsarTransportProtocol",
+            "brokerHostname": "pulsar://localhost:6650",
+            "elementId": "sp:transportprotocol:lvMQWc",
+            "topicDefinition": null
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+            "brokerHostname": null,
+            "elementId": "sp:transportprotocol:EiyDHT",
+            "topicDefinition": null,
+            "acks": null,
+            "batchSize": null,
+            "groupId": null,
+            "kafkaPort": 0,
+            "lingerMs": null,
+            "maxRequestSize": null,
+            "messageMaxBytes": null,
+            "offset": null,
+            "zookeeperHost": null,
+            "zookeeperPort": 0
+          }
+        ]
+      },
+      "uncompleted": false,
+      "category": [
+        "INTERNAL"
+      ]
+    }
+  ],
+  "description": "",
+  "name": "test",
+  "sepas": [
+    {
+      "@class": "org.apache.streampipes.model.graph.DataProcessorInvocation",
+      "appId": 
"org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
+      "connectedTo": [
+        "jsplumb_0_x6Gz"
+      ],
+      "description": "Inverts the boolean value of the selected field",
+      "dom": "jsplumb_1_j3BB",
+      "elementId": "sp:dataprocessorinvocation:JOqFCM:R0okK",
+      "includedAssets": [
+        "documentation.md",
+        "icon.png"
+      ],
+      "includedLocales": [
+        "strings.en"
+      ],
+      "includesAssets": true,
+      "includesLocales": true,
+      "internallyManaged": false,
+      "name": "Boolean Inverter",
+      "rev": null,
+      "version": 0,
+      "belongsTo": 
"sp:org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
+      "configured": true,
+      "correspondingPipeline": null,
+      "correspondingUser": null,
+      "inputStreams": [
+        {
+          "@class": "org.apache.streampipes.model.SpDataStream",
+          "appId": "urn:streampipes.apache.org:eventstream:VPyWtS",
+          "connectedTo": null,
+          "description": null,
+          "dom": null,
+          "elementId": "urn:streampipes.apache.org:eventstream:VPyWtS",
+          "includedAssets": [],
+          "includedLocales": [],
+          "includesAssets": false,
+          "includesLocales": false,
+          "internallyManaged": false,
+          "name": null,
+          "rev": null,
+          "category": null,
+          "correspondingAdapterId": null,
+          "eventGrounding": {
+            "transportProtocols": [
+              {
+                "@class": 
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+                "brokerHostname": "kafka",
+                "elementId": "sp:transportprotocol:KoHuzT",
+                "topicDefinition": {
+                  "@class": 
"org.apache.streampipes.model.grounding.SimpleTopicDefinition",
+                  "actualTopicName": "e2e-adapter-out-topic-name"
+                },
+                "acks": null,
+                "batchSize": null,
+                "groupId": null,
+                "kafkaPort": 9092,
+                "lingerMs": null,
+                "maxRequestSize": null,
+                "messageMaxBytes": null,
+                "offset": null,
+                "zookeeperHost": null,
+                "zookeeperPort": 0
+              }
+            ]
+          },
+          "eventSchema": {
+            "eventProperties": [
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "Denotes the current density of the fluid",
+                "elementId": "sp:eventproperty:Ruvqlo",
+                "label": "Density",
+                "propertyScope": "MEASUREMENT_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "density",
+                "semanticType": "http://schema.org/Number";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+                "valueSpecification": null
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "Denotes the current mass flow in the sensor",
+                "elementId": "sp:eventproperty:NyTjgp",
+                "label": "Mass Flow",
+                "propertyScope": "MEASUREMENT_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "mass_flow",
+                "semanticType": "http://schema.org/Number";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+                "valueSpecification": null
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "The ID of the sensor",
+                "elementId": "sp:eventproperty:ThjvdL",
+                "label": "Sensor ID",
+                "propertyScope": "DIMENSION_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "sensorId",
+                "semanticType": 
"https://streampipes.org/vocabulary/examples/watertank/v1/hasSensorId";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#string";,
+                "valueSpecification": null
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "Any fault flags of the sensors",
+                "elementId": "sp:eventproperty:vxokoH",
+                "label": "Sensor Fault Flags",
+                "propertyScope": "MEASUREMENT_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "sensor_fault_flags",
+                "semanticType": "http://schema.org/Boolean";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean";,
+                "valueSpecification": null
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "Denotes the current temperature in degrees 
celsius",
+                "elementId": "sp:eventproperty:cmnXbe",
+                "label": "Temperature",
+                "propertyScope": "MEASUREMENT_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "temperature",
+                "semanticType": "http://schema.org/Number";,
+                "measurementUnit": "http://qudt.org/vocab/unit#DegreeCelsius";,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+                "valueSpecification": {
+                  "@class": 
"org.apache.streampipes.model.schema.QuantitativeValue",
+                  "maxValue": 100,
+                  "minValue": 0,
+                  "step": 0.1
+                }
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "The current timestamp value",
+                "elementId": "sp:eventproperty:AYjMkj",
+                "label": "Timestamp",
+                "propertyScope": "HEADER_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "timestamp",
+                "semanticType": "http://schema.org/DateTime";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#long";,
+                "valueSpecification": null
+              },
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": "Denotes the current volume flow",
+                "elementId": "sp:eventproperty:ORzRsw",
+                "label": "Volume Flow",
+                "propertyScope": "MEASUREMENT_PROPERTY",
+                "runtimeId": null,
+                "runtimeName": "volume_flow",
+                "semanticType": "http://schema.org/Number";,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+                "valueSpecification": null
+              }
+            ]
+          },
+          "index": 0
+        }
+      ],
+      "selectedEndpointUrl": null,
+      "serviceTagPrefix": "DATA_PROCESSOR",
+      "staticProperties": [
+        {
+          "@class": 
"org.apache.streampipes.model.staticproperty.MappingPropertyUnary",
+          "description": "The field which should be inverted",
+          "internalName": "invert-field",
+          "label": "Invert field",
+          "optional": false,
+          "staticPropertyType": "MappingPropertyUnary",
+          "mapsFromOptions": [
+            "s0::sensor_fault_flags"
+          ],
+          "propertyScope": "NONE",
+          "requirementSelector": "r0::invert-field",
+          "selectedProperty": "s0::sensor_fault_flags"
+        }
+      ],
+      "streamRequirements": [
+        {
+          "@class": "org.apache.streampipes.model.SpDataStream",
+          "appId": "urn:streampipes.apache.org:eventstream:VPyWtS",
+          "connectedTo": null,
+          "description": null,
+          "dom": null,
+          "elementId": "urn:streampipes.apache.org:eventstream:VPyWtS",
+          "includedAssets": [],
+          "includedLocales": [],
+          "includesAssets": false,
+          "includesLocales": false,
+          "internallyManaged": false,
+          "name": null,
+          "rev": null,
+          "category": null,
+          "correspondingAdapterId": null,
+          "eventGrounding": null,
+          "eventSchema": {
+            "eventProperties": [
+              {
+                "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+                "additionalMetadata": {},
+                "description": null,
+                "elementId": "sp:eventproperty:sHuzzM",
+                "label": null,
+                "propertyScope": null,
+                "runtimeId": null,
+                "runtimeName": "invert-field",
+                "semanticType": null,
+                "measurementUnit": null,
+                "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean";,
+                "valueSpecification": null
+              }
+            ]
+          },
+          "index": 0
+        }
+      ],
+      "supportedGrounding": {
+        "transportProtocols": [
+          {
+            "@class": 
"org.apache.streampipes.model.grounding.MqttTransportProtocol",
+            "brokerHostname": null,
+            "elementId": "sp:transportprotocol:IxejrD",
+            "topicDefinition": null,
+            "port": 0
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.grounding.NatsTransportProtocol",
+            "brokerHostname": null,
+            "elementId": "sp:transportprotocol:hMYAXY",
+            "topicDefinition": null,
+            "port": 0
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.grounding.JmsTransportProtocol",
+            "brokerHostname": null,
+            "elementId": "sp:transportprotocol:ycSaOT",
+            "topicDefinition": null,
+            "port": 0
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.grounding.PulsarTransportProtocol",
+            "brokerHostname": "pulsar://localhost:6650",
+            "elementId": "sp:transportprotocol:lvMQWc",
+            "topicDefinition": null
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+            "brokerHostname": null,
+            "elementId": "sp:transportprotocol:EiyDHT",
+            "topicDefinition": null,
+            "acks": null,
+            "batchSize": null,
+            "groupId": null,
+            "kafkaPort": 0,
+            "lingerMs": null,
+            "maxRequestSize": null,
+            "messageMaxBytes": null,
+            "offset": null,
+            "zookeeperHost": null,
+            "zookeeperPort": 0
+          }
+        ]
+      },
+      "uncompleted": false,
+      "category": [
+        "BOOLEAN_OPERATOR"
+      ],
+      "outputStrategies": [
+        {
+          "@class": "org.apache.streampipes.model.output.KeepOutputStrategy",
+          "name": null,
+          "renameRules": [],
+          "eventName": null,
+          "keepBoth": false
+        }
+      ],
+      "outputStream": {
+        "@class": "org.apache.streampipes.model.SpDataStream",
+        "appId": "urn:streampipes.apache.org:eventstream:zPxOLe",
+        "connectedTo": null,
+        "description": null,
+        "dom": null,
+        "elementId": "urn:streampipes.apache.org:eventstream:zPxOLe",
+        "includedAssets": [],
+        "includedLocales": [],
+        "includesAssets": false,
+        "includesLocales": false,
+        "internallyManaged": false,
+        "name": null,
+        "rev": null,
+        "category": null,
+        "correspondingAdapterId": null,
+        "eventGrounding": {
+          "transportProtocols": [
+            {
+              "@class": 
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+              "brokerHostname": "kafka",
+              "elementId": null,
+              "topicDefinition": {
+                "@class": 
"org.apache.streampipes.model.grounding.SimpleTopicDefinition",
+                "actualTopicName": 
"org-apache-streampipes-internal-prbfXSLBvVHpoeX"
+              },
+              "acks": null,
+              "batchSize": null,
+              "groupId": null,
+              "kafkaPort": 9092,
+              "lingerMs": null,
+              "maxRequestSize": null,
+              "messageMaxBytes": null,
+              "offset": null,
+              "zookeeperHost": "zookeeper",
+              "zookeeperPort": 2181
+            }
+          ]
+        },
+        "eventSchema": {
+          "eventProperties": [
+            {
+              "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+              "additionalMetadata": {},
+              "description": "Denotes the current density of the fluid",
+              "elementId": "sp:eventproperty:Ruvqlo",
+              "label": "Density",
+              "propertyScope": "MEASUREMENT_PROPERTY",
+              "runtimeId": null,
+              "runtimeName": "density",
+              "semanticType": "http://schema.org/Number";,
+              "measurementUnit": null,
+              "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+              "valueSpecification": null
+            },
+            {
+              "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+              "additionalMetadata": {},
+              "description": "Denotes the current mass flow in the sensor",
+              "elementId": "sp:eventproperty:NyTjgp",
+              "label": "Mass Flow",
+              "propertyScope": "MEASUREMENT_PROPERTY",
+              "runtimeId": null,
+              "runtimeName": "mass_flow",
+              "semanticType": "http://schema.org/Number";,
+              "measurementUnit": null,
+              "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+              "valueSpecification": null
+            },
+            {
+              "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+              "additionalMetadata": {},
+              "description": "The ID of the sensor",
+              "elementId": "sp:eventproperty:ThjvdL",
+              "label": "Sensor ID",
+              "propertyScope": "DIMENSION_PROPERTY",
+              "runtimeId": null,
+              "runtimeName": "sensorId",
+              "semanticType": 
"https://streampipes.org/vocabulary/examples/watertank/v1/hasSensorId";,
+              "measurementUnit": null,
+              "runtimeType": "http://www.w3.org/2001/XMLSchema#string";,
+              "valueSpecification": null
+            },
+            {
+              "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+              "additionalMetadata": {},
+              "description": "Any fault flags of the sensors",
+              "elementId": "sp:eventproperty:vxokoH",
+              "label": "Sensor Fault Flags",
+              "propertyScope": "MEASUREMENT_PROPERTY",
+              "runtimeId": null,
+              "runtimeName": "sensor_fault_flags",
+              "semanticType": "http://schema.org/Boolean";,
+              "measurementUnit": null,
+              "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean";,
+              "valueSpecification": null
+            },
+            {
+              "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+              "additionalMetadata": {},
+              "description": "Denotes the current temperature in degrees 
celsius",
+              "elementId": "sp:eventproperty:cmnXbe",
+              "label": "Temperature",
+              "propertyScope": "MEASUREMENT_PROPERTY",
+              "runtimeId": null,
+              "runtimeName": "temperature",
+              "semanticType": "http://schema.org/Number";,
+              "measurementUnit": "http://qudt.org/vocab/unit#DegreeCelsius";,
+              "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+              "valueSpecification": {
+                "@class": 
"org.apache.streampipes.model.schema.QuantitativeValue",
+                "maxValue": 100,
+                "minValue": 0,
+                "step": 0.1
+              }
+            },
+            {
+              "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+              "additionalMetadata": {},
+              "description": "The current timestamp value",
+              "elementId": "sp:eventproperty:AYjMkj",
+              "label": "Timestamp",
+              "propertyScope": "HEADER_PROPERTY",
+              "runtimeId": null,
+              "runtimeName": "timestamp",
+              "semanticType": "http://schema.org/DateTime";,
+              "measurementUnit": null,
+              "runtimeType": "http://www.w3.org/2001/XMLSchema#long";,
+              "valueSpecification": null
+            },
+            {
+              "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+              "additionalMetadata": {},
+              "description": "Denotes the current volume flow",
+              "elementId": "sp:eventproperty:ORzRsw",
+              "label": "Volume Flow",
+              "propertyScope": "MEASUREMENT_PROPERTY",
+              "runtimeId": null,
+              "runtimeName": "volume_flow",
+              "semanticType": "http://schema.org/Number";,
+              "measurementUnit": null,
+              "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+              "valueSpecification": null
+            }
+          ]
+        },
+        "index": 0
+      }
+    }
+  ],
+  "streams": [
+    {
+      "@class": "org.apache.streampipes.model.SpDataStream",
+      "appId": "e2e-adapter-id",
+      "connectedTo": null,
+      "description": "",
+      "dom": "jsplumb_0_x6Gz",
+      "elementId": "e2e-adapter-id",
+      "includedAssets": [],
+      "includedLocales": [],
+      "includesAssets": false,
+      "includesLocales": false,
+      "internallyManaged": true,
+      "name": "e2e-adapter-name",
+      "rev": "e2e-stream-rev",
+      "category": null,
+      "correspondingAdapterId": "sp:adapterdescription:OiTAHK",
+      "eventGrounding": {
+        "transportProtocols": [
+          {
+            "@class": 
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+            "brokerHostname": "kafka",
+            "elementId": "sp:transportprotocol:KoHuzT",
+            "topicDefinition": {
+              "@class": 
"org.apache.streampipes.model.grounding.SimpleTopicDefinition",
+              "actualTopicName": "e2e-adapter-out-topic-name"
+            },
+            "acks": null,
+            "batchSize": null,
+            "groupId": null,
+            "kafkaPort": 9092,
+            "lingerMs": null,
+            "maxRequestSize": null,
+            "messageMaxBytes": null,
+            "offset": null,
+            "zookeeperHost": null,
+            "zookeeperPort": 0
+          }
+        ]
+      },
+      "eventSchema": {
+        "eventProperties": [
+          {
+            "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+            "additionalMetadata": {},
+            "description": "Denotes the current density of the fluid",
+            "elementId": "sp:eventproperty:Ruvqlo",
+            "label": "Density",
+            "propertyScope": "MEASUREMENT_PROPERTY",
+            "runtimeId": null,
+            "runtimeName": "density",
+            "semanticType": "http://schema.org/Number";,
+            "measurementUnit": null,
+            "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+            "valueSpecification": null
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+            "additionalMetadata": {},
+            "description": "Denotes the current mass flow in the sensor",
+            "elementId": "sp:eventproperty:NyTjgp",
+            "label": "Mass Flow",
+            "propertyScope": "MEASUREMENT_PROPERTY",
+            "runtimeId": null,
+            "runtimeName": "mass_flow",
+            "semanticType": "http://schema.org/Number";,
+            "measurementUnit": null,
+            "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+            "valueSpecification": null
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+            "additionalMetadata": {},
+            "description": "The ID of the sensor",
+            "elementId": "sp:eventproperty:ThjvdL",
+            "label": "Sensor ID",
+            "propertyScope": "DIMENSION_PROPERTY",
+            "runtimeId": null,
+            "runtimeName": "sensorId",
+            "semanticType": 
"https://streampipes.org/vocabulary/examples/watertank/v1/hasSensorId";,
+            "measurementUnit": null,
+            "runtimeType": "http://www.w3.org/2001/XMLSchema#string";,
+            "valueSpecification": null
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+            "additionalMetadata": {},
+            "description": "Any fault flags of the sensors",
+            "elementId": "sp:eventproperty:vxokoH",
+            "label": "Sensor Fault Flags",
+            "propertyScope": "MEASUREMENT_PROPERTY",
+            "runtimeId": null,
+            "runtimeName": "sensor_fault_flags",
+            "semanticType": "http://schema.org/Boolean";,
+            "measurementUnit": null,
+            "runtimeType": "http://www.w3.org/2001/XMLSchema#boolean";,
+            "valueSpecification": null
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+            "additionalMetadata": {},
+            "description": "Denotes the current temperature in degrees 
celsius",
+            "elementId": "sp:eventproperty:cmnXbe",
+            "label": "Temperature",
+            "propertyScope": "MEASUREMENT_PROPERTY",
+            "runtimeId": null,
+            "runtimeName": "temperature",
+            "semanticType": "http://schema.org/Number";,
+            "measurementUnit": "http://qudt.org/vocab/unit#DegreeCelsius";,
+            "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+            "valueSpecification": {
+              "@class": 
"org.apache.streampipes.model.schema.QuantitativeValue",
+              "maxValue": 100,
+              "minValue": 0,
+              "step": 0.1
+            }
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+            "additionalMetadata": {},
+            "description": "The current timestamp value",
+            "elementId": "sp:eventproperty:AYjMkj",
+            "label": "Timestamp",
+            "propertyScope": "HEADER_PROPERTY",
+            "runtimeId": null,
+            "runtimeName": "timestamp",
+            "semanticType": "http://schema.org/DateTime";,
+            "measurementUnit": null,
+            "runtimeType": "http://www.w3.org/2001/XMLSchema#long";,
+            "valueSpecification": null
+          },
+          {
+            "@class": 
"org.apache.streampipes.model.schema.EventPropertyPrimitive",
+            "additionalMetadata": {},
+            "description": "Denotes the current volume flow",
+            "elementId": "sp:eventproperty:ORzRsw",
+            "label": "Volume Flow",
+            "propertyScope": "MEASUREMENT_PROPERTY",
+            "runtimeId": null,
+            "runtimeName": "volume_flow",
+            "semanticType": "http://schema.org/Number";,
+            "measurementUnit": null,
+            "runtimeType": "http://www.w3.org/2001/XMLSchema#float";,
+            "valueSpecification": null
+          }
+        ]
+      },
+      "index": 0
+    }
+  ],
+  "valid": true
+}
\ No newline at end of file
diff --git 
a/streampipes-client-e2e/go-client-e2e/utils/create_machine_apadter.go 
b/streampipes-client-e2e/go-client-e2e/utils/create_machine_apadter.go
new file mode 100644
index 0000000000..2f9034c1f1
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/utils/create_machine_apadter.go
@@ -0,0 +1,37 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package utils
+
+import (
+       "io/ioutil"
+       "os"
+)
+
+func CreateData(jsonFilePath string) []byte {
+       jsonFile, err := os.Open(jsonFilePath)
+       if err != nil {
+               return nil
+       }
+       defer jsonFile.Close()
+
+       jsonData, err := ioutil.ReadAll(jsonFile)
+       if err != nil {
+               return nil
+       }
+       return jsonData
+}
diff --git a/streampipes-client-e2e/go-client-e2e/utils/streampipes_client.go 
b/streampipes-client-e2e/go-client-e2e/utils/streampipes_client.go
new file mode 100644
index 0000000000..90c2b87203
--- /dev/null
+++ b/streampipes-client-e2e/go-client-e2e/utils/streampipes_client.go
@@ -0,0 +1,36 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package utils
+
+import (
+       "github.com/apache/streampipes/streampipes-client-go/streampipes"
+       "github.com/apache/streampipes/streampipes-client-go/streampipes/config"
+       "os"
+)
+
+func CreateStreamPipesClient() (*streampipes.StreamPipesClient, error) {
+       length := len(os.Args)
+       clientConfig := config.StreamPipesClientConfig{
+               Url: "http://"; + os.Args[length-4] + ":" + os.Args[length-3],
+               Credential: config.StreamPipesApiKeyCredentials{
+                       UserName: os.Args[length-1],
+                       ApiKey:   os.Args[length-2],
+               },
+       }
+       return streampipes.NewStreamPipesClient(clientConfig)
+}
diff --git a/streampipes-client-e2e/tool/go-client-e2e.sh 
b/streampipes-client-e2e/tool/go-client-e2e.sh
new file mode 100644
index 0000000000..3b52e89e2e
--- /dev/null
+++ b/streampipes-client-e2e/tool/go-client-e2e.sh
@@ -0,0 +1,61 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set environment variables
+HOST=""
+PORT=""
+APIKEY=""
+API_KEY_USER_NAME=""
+
+while true; do
+    case "$1" in
+        -h)
+            HOST="$2"
+            shift 2
+        ;;
+        -p)
+            PORT="$2"
+            shift 2
+        ;;
+        -u)
+            API_KEY_USER_NAME="$2"
+            shift 2
+        ;;
+        -k)
+            APIKEY="$2"
+            shift 2
+        ;;
+        "")
+            #skip directly
+            break
+        ;;
+        *)
+          #skip directly
+            shift
+        ;;
+    esac
+done
+
+cd ../go-client-e2e || exit
+go test -v ../go-client-e2e/... -args "$HOST" "$PORT" "$APIKEY" 
"$API_KEY_USER_NAME"
+if [ $? -ne 0 ]; then
+    cd ../tool
+    echo "Error: go test failed"
+    exit 1
+fi
+cd ../tool || exit
+echo "All tests passed successfully"
\ No newline at end of file
diff --git a/streampipes-client-e2e/tool/install-element.sh 
b/streampipes-client-e2e/tool/install-element.sh
new file mode 100644
index 0000000000..2b3c340f7a
--- /dev/null
+++ b/streampipes-client-e2e/tool/install-element.sh
@@ -0,0 +1,113 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+INSTALL_ELEMENT_URL="/streampipes-backend/api/v2/extension-installation"
+
+HOST="127.0.0.1"
+PORT="8030"
+TOKEN=""
+
+
+while true; do
+    case "$1" in
+        -host)
+            HOST="$2"
+            shift 2
+            ;;
+        -port)
+            PORT="$2"
+            shift 2
+        ;;
+        -token)
+            TOKEN="$2"
+            shift 2
+        ;;
+        --help)
+            echo "Usage: $0 [-host <ip>] [-port <port>] [-token <username>]"
+            exit 0
+        ;;
+        "")
+            #skip directly
+            break
+        ;;
+        *)
+          #skip directly
+            shift
+        ;;
+    esac
+done
+
+
+######################Adapters######################
+
+#machine
+installRequestBody='{
+  "appId":"org.apache.streampipes.connect.iiot.adapters.simulator.machine",
+  "publicElement":true,
+  "serviceTagPrefix":"ADAPTER"
+  }'
+
+response=$(curl -s -X POST "http://$HOST:$PORT$INSTALL_ELEMENT_URL"; \
+   -H "Content-Type: application/json" \
+   -H "authorization: Bearer $TOKEN" \
+   -d "$installRequestBody")
+if [ $? -ne 0 ]; then
+    echo "$response"
+    echo "Error install machine adapter"
+    exit 1
+fi
+
+
+
+######################processor######################
+
+#bool_inverter
+installRequestBody='{
+  
"appId":"org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
+  "publicElement":true,
+  "serviceTagPrefix":"DATA_PROCESSOR"
+  }'
+
+response=$(curl -s -X POST "http://$HOST:$PORT$INSTALL_ELEMENT_URL"; \
+   -H "Content-Type: application/json" \
+   -H "authorization: Bearer $TOKEN" \
+   -d "$installRequestBody")
+if [ $? -ne 0 ]; then
+    echo "$response"
+    echo "Error install bool inverter processor"
+    exit 1
+fi
+
+
+######################sink######################
+#datalake
+installRequestBody='{
+  "appId":"org.apache.streampipes.sinks.internal.jvm.datalake",
+  "publicElement":true,
+  "serviceTagPrefix":"DATA_SINK"
+  }'
+
+response=$(curl -s -X POST "http://$HOST:$PORT$INSTALL_ELEMENT_URL"; \
+   -H "Content-Type: application/json" \
+   -H "authorization: Bearer $TOKEN" \
+   -d "$installRequestBody")
+if [ $? -ne 0 ]; then
+    echo "$response"
+    echo "Error install datalake"
+    exit 1
+fi
\ No newline at end of file
diff --git a/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh 
b/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
new file mode 100644
index 0000000000..cbab40355f
--- /dev/null
+++ b/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
@@ -0,0 +1,148 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set environment variables
+E2E_TEST=""
+HOST="127.0.0.1"
+PORT="8030"
+LOGIN_URL="/streampipes-backend/api/v2/auth/login"
+
+SP_USERNAME="[email protected]"
+SP_PASSWORD="admin"
+
+while true; do
+    case "$1" in
+        -u)
+            SP_USERNAME="$2"
+            shift 2
+            ;;
+        -pw)
+            SP_PASSWORD="$2"
+            shift 2
+        ;;
+        -h)
+            HOST="$2"
+            shift 2
+        ;;
+        -p)
+            PORT="$2"
+            shift 2
+        ;;
+        -t)
+            E2E_TEST="$2"
+            shift 2
+        ;;
+        --help)
+            echo "Usage: $0 [-h <ip>] [-p <port>] [-u <username>] [-pw 
<password>] [-t <E2E_TEST>]"
+            exit 0
+        ;;
+        "")
+            #skip directly
+            break
+        ;;
+        *)
+          #skip directly
+            shift
+        ;;
+    esac
+done
+
+if [ E2E_TEST == "" ]; then
+    echo "-t is empty"
+    exit 1
+fi
+
+API_KEY_URL="/streampipes-backend/api/v2/users/$SP_USERNAME/tokens"
+API_KEY_USER_NAME="$SP_USERNAME"
+
+loginRequestBody='{
+  "username": "'"$SP_USERNAME"'",
+  "password": "'"$SP_PASSWORD"'"
+}'
+
+if nc -zv localhost 8030; then
+    echo "Port 8030 is open and listening."
+else
+    echo "Port 8030 is not open or not listening."
+    exit 1
+fi
+
+echo "start login"
+max_attempts=60
+attempt=1
+while [ $attempt -le $max_attempts ]
+do
+    response=$(curl -s -X POST "http://$HOST:$PORT$LOGIN_URL"; \
+        -H "Content-Type: application/json" \
+        -d "$loginRequestBody")
+    if [ $? -eq 0 ]; then
+        echo "Login successful"
+        break
+    else
+        echo "$response"
+        echo "Error: Login request failed on attempt $attempt"
+        if [ $attempt -eq $max_attempts ]; then
+            echo "Max attempts reached. Exiting."
+            exit 0
+        else
+            echo "Retrying in 1 second..."
+            sleep 1
+        fi
+    fi
+    attempt=$((attempt+1))
+done
+
+echo "get token"
+accessToken=$(echo "$response" | sed -n 's/.*"accessToken":"\([^"]*\)".*/\1/p')
+if [ -z "$accessToken" ]; then
+    echo "Error: Failed to retrieve access token"
+    exit 1
+fi
+
+apiKeyRequestBody='{
+  "tokenName": "'"$API_KEY_USER_NAME"'"
+}'
+
+echo "install element"
+chmod +x ./install-element.sh
+./install-element.sh -host "$HOST" -port "$PORT" -token "$accessToken"
+
+echo "get apikey"
+# Get APIKEY
+APIKEYRESP=$(curl -s -X POST "http://$HOST:$PORT$API_KEY_URL"; \
+   -H "Content-Type: application/json" \
+   -H "authorization: Bearer $accessToken" \
+   -d "$apiKeyRequestBody")
+if [ $? -ne 0 ]; then
+    echo "Error: API Key request failed"
+    exit 1
+fi
+APIKEY=$(echo "$APIKEYRESP" | sed 's/.*"rawToken":"\([^"]*\)".*/\1/')
+if [ -z "$APIKEY" ]; then
+    echo "Error: Failed to retrieve API key"
+    exit 1
+fi
+
+echo "start e2e test"
+chmod +x ./"$E2E_TEST"
+./"$E2E_TEST" -h "$HOST" -p "$PORT" -u "$API_KEY_USER_NAME" -k "$APIKEY"
+if [ $? -ne 0 ]; then
+    echo "start $E2E_TEST failed"
+    exit 1
+fi
+
+echo "All tests passed successfully"
\ No newline at end of file
diff --git a/streampipes-client-go/streampipes/adapter_api.go 
b/streampipes-client-go/streampipes/adapter_api.go
index 06fb42d789..d2f7eb7637 100644
--- a/streampipes-client-go/streampipes/adapter_api.go
+++ b/streampipes-client-go/streampipes/adapter_api.go
@@ -120,13 +120,9 @@ func (a *Adapter) DeleteSingleAdapter(adapterId string) 
error {
        return nil
 }
 
-func (a *Adapter) CreateAdapter(adapters adapter.AdapterDescription) error {
+func (a *Adapter) CreateAdapter(adapters []byte) error {
        endPointUrl := util.NewStreamPipesApiPath(a.config.Url, 
"streampipes-backend/api/v2/connect/master/adapters", nil)
-       body, err := serializer.NewAdapterSerializer().Marshal(adapters)
-       if err != nil {
-               return err
-       }
-       response, err := a.executeRequest("PUT", endPointUrl, body)
+       response, err := a.executeRequest("POST", endPointUrl, adapters)
        if err != nil {
                return err
        }
@@ -158,8 +154,7 @@ func (a *Adapter) StopSingleAdapter(adapterId string) error 
{
 }
 
 func (a *Adapter) StartSingleAdapter(adapterId string) error {
-       endPointUrl := util.NewStreamPipesApiPath(a.config.Url, 
"streampipes-backend/api/v2/pipelines", []string{adapterId, "start"})
-
+       endPointUrl := util.NewStreamPipesApiPath(a.config.Url, 
"streampipes-backend/api/v2/connect/master/adapters", []string{adapterId, 
"start"})
        response, err := a.executeRequest("POST", endPointUrl, nil)
        if err != nil {
                return err
diff --git 
a/streampipes-client-go/streampipes/internal/serializer/deserializer.go 
b/streampipes-client-go/streampipes/internal/serializer/deserializer.go
index 9c47fd3383..27283c85b4 100644
--- a/streampipes-client-go/streampipes/internal/serializer/deserializer.go
+++ b/streampipes-client-go/streampipes/internal/serializer/deserializer.go
@@ -20,6 +20,8 @@ package serializer
 import (
        "encoding/json"
        
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/adapter"
+       "log"
+       "strings"
 
        "github.com/apache/streampipes/streampipes-client-go/streampipes/model"
        
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/data_lake"
@@ -45,8 +47,10 @@ func NewDataLakeMeasuresDeserializer() 
*DataLakeMeasuresDeserializer {
 
 func (d DataLakeMeasuresDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
        var dataLakeMeasures []data_lake.DataLakeMeasure
-       err := json.Unmarshal(data, &dataLakeMeasures)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&dataLakeMeasures); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return dataLakeMeasures, nil
@@ -60,8 +64,10 @@ func NewDataLakeMeasureDeserializer() 
*DataLakeMeasureDeserializer {
 
 func (d DataLakeMeasureDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
        var dataLakeMeasure data_lake.DataLakeMeasure
-       err := json.Unmarshal(data, &dataLakeMeasure)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&dataLakeMeasure); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return dataLakeMeasure, nil
@@ -75,8 +81,10 @@ func NewDataSeriesDeserializer() *DataSeriesDeserializer {
 
 func (d DataSeriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
        var dataSeries data_lake.DataSeries
-       err := json.Unmarshal(data, &dataSeries)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&dataSeries); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return dataSeries, nil
@@ -89,12 +97,14 @@ func NewStreamPipesVersionDeserializer() 
*StreamPipesVersionDeserializer {
 }
 
 func (d StreamPipesVersionDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
-       var dataSeries streampipes_version.Versions
-       err := json.Unmarshal(data, &dataSeries)
-       if err != nil {
+       var version streampipes_version.Versions
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&version); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
-       return dataSeries, nil
+       return version, nil
 }
 
 type ResponseMessageDeserializer struct{}
@@ -105,8 +115,10 @@ func NewResponseMessageDeserializer() 
*ResponseMessageDeserializer {
 
 func (r ResponseMessageDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
        var responseMessage model.ResponseMessage
-       err := json.Unmarshal(data, &responseMessage)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&responseMessage); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return responseMessage, nil
@@ -120,10 +132,15 @@ func NewPipelineCategoriesDeserializer() 
*PipelineCategoriesDeserializer {
 
 func (p PipelineCategoriesDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
        var pipelineCategory []pipeline.PipelineCategory
-       err := json.Unmarshal(data, &pipelineCategory)
-       if err != nil {
+
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+
+       if err := dec.Decode(&pipelineCategory); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Fatal(err)
                return nil, err
        }
+
        return pipelineCategory, nil
 }
 
@@ -135,8 +152,10 @@ func NewDataLakeDashboardDeserializer() 
*DataLakeDashboardDeserializer {
 
 func (d DataLakeDashboardDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
        var dashborad data_lake.Dashboard
-       err := json.Unmarshal(data, &dashborad)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&dashborad); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return dashborad, nil
@@ -149,13 +168,15 @@ func NewDataLakeDashboardsDeserializer() 
*DataLakeDashboardsDeserializer {
 }
 
 func (d DataLakeDashboardsDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
-       var dashborad []data_lake.Dashboard
-       err := json.Unmarshal(data, &dashborad)
-       if err != nil {
+       var dashborads []data_lake.Dashboard
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&dashborads); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
 
-       return dashborad, nil
+       return dashborads, nil
 }
 
 type DataLakeWidgetDeserializer struct{}
@@ -166,8 +187,10 @@ func NewDataLakeWidgetDeserializer() 
*DataLakeWidgetDeserializer {
 
 func (d DataLakeWidgetDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
        var widget data_lake.DataExplorerWidgetModel
-       err := json.Unmarshal(data, &widget)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&widget); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return widget, nil
@@ -180,12 +203,14 @@ func NewDataLakeWidgetsDeserializer() 
*DataLakeWidgetsDeserializer {
 }
 
 func (d DataLakeWidgetsDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
-       var widget []data_lake.DataExplorerWidgetModel
-       err := json.Unmarshal(data, &widget)
-       if err != nil {
+       var widgets []data_lake.DataExplorerWidgetModel
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&widgets); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
-       return widget, nil
+       return widgets, nil
 }
 
 type SpLogEntriesDeserializer struct{}
@@ -194,13 +219,15 @@ func NewSpLogEntriesDeserializer() 
*SpLogEntriesDeserializer {
        return &SpLogEntriesDeserializer{}
 }
 
-func (p SpLogEntriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
-       var userAccount []functions.SpLogEntry
-       err := json.Unmarshal(data, &userAccount)
-       if err != nil {
+func (s SpLogEntriesDeserializer) Unmarshal(data []byte) (interface{}, error) {
+       var spLogEntry []functions.SpLogEntry
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&spLogEntry); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
-       return userAccount, nil
+       return spLogEntry, nil
 }
 
 type SpMetricsEntryDeserializer struct{}
@@ -209,10 +236,12 @@ func NewSpMetricsEntryDeserializer() 
*SpMetricsEntryDeserializer {
        return &SpMetricsEntryDeserializer{}
 }
 
-func (p SpMetricsEntryDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
+func (s SpMetricsEntryDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
        var spMetricsEntry functions.SpMetricsEntry
-       err := json.Unmarshal(data, &spMetricsEntry)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&spMetricsEntry); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return spMetricsEntry, nil
@@ -224,10 +253,12 @@ func NewFunctionDefinitionsDeserializer() 
*FunctionDefinitionsDeserializer {
        return &FunctionDefinitionsDeserializer{}
 }
 
-func (p FunctionDefinitionsDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
+func (f FunctionDefinitionsDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
        var functionDefinitions []functions.FunctionDefinition
-       err := json.Unmarshal(data, &functionDefinitions)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&functionDefinitions); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return functionDefinitions, nil
@@ -242,8 +273,10 @@ func NewShortUserInfosDeserializer() 
*ShortUserInfosDeserializer {
 
 func (s ShortUserInfosDeserializer) Unmarshal(data []byte) (interface{}, 
error) {
        var shortUserInfo []streampipes_user.ShortUserInfo
-       err := json.Unmarshal(data, &shortUserInfo)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&shortUserInfo); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return shortUserInfo, nil
@@ -257,8 +290,10 @@ func NewUserAccountDeserializer() *UserAccountDeserializer 
{
 
 func (p UserAccountDeserializer) Unmarshal(data []byte) (interface{}, error) {
        var userAccount streampipes_user.UserAccount
-       err := json.Unmarshal(data, &userAccount)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&userAccount); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return userAccount, nil
@@ -273,10 +308,13 @@ func NewPipelineDeserializer() *PipelineDeserializer {
 
 func (p PipelineDeserializer) Unmarshal(data []byte) (interface{}, error) {
        var pipeLine pipeline.Pipeline
-       err := json.Unmarshal(data, &pipeLine)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&pipeLine); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
+
        return pipeLine, nil
 
 }
@@ -289,8 +327,11 @@ func NewPipelinesDeserializer() *PipelinesDeserializer {
 
 func (p PipelinesDeserializer) Unmarshal(data []byte) (interface{}, error) {
        var pipelines []pipeline.Pipeline
-       err := json.Unmarshal(data, &pipelines)
-       if err != nil {
+
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&pipelines); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return pipelines, nil
@@ -305,8 +346,10 @@ func NewPipelineStatusMessagesDeserializer() 
*PipelineStatusMessagesDeserializer
 
 func (p PipelineStatusMessagesDeserializer) Unmarshal(data []byte) 
(interface{}, error) {
        var pipelineStatusMessage []pipeline.PipelineStatusMessage
-       err := json.Unmarshal(data, &pipelineStatusMessage)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&pipelineStatusMessage); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return pipelineStatusMessage, nil
@@ -320,8 +363,10 @@ func NewPipelineOperationStatusDeserializer() 
*PipelineOperationStatusDeserializ
 
 func (p PipelineOperationStatusDeserializer) Unmarshal(data []byte) 
(interface{}, error) {
        var pipelineOperationStatus pipeline.PipelineOperationStatus
-       err := json.Unmarshal(data, &pipelineOperationStatus)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+       if err := dec.Decode(&pipelineOperationStatus); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Println(err)
                return nil, err
        }
        return pipelineOperationStatus, nil
@@ -335,8 +380,11 @@ func NewAdapterDeserializer() *AdapterDeserializer {
 
 func (a AdapterDeserializer) Unmarshal(data []byte) (interface{}, error) {
        var adapterDescription adapter.AdapterDescription
-       err := json.Unmarshal(data, &adapterDescription)
-       if err != nil {
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+
+       if err := dec.Decode(&adapterDescription); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Fatal(err)
                return nil, err
        }
        return adapterDescription, nil
@@ -351,8 +399,12 @@ func NewAdaptersDeserializer() *AdaptersDeserializer {
 
 func (a AdaptersDeserializer) Unmarshal(data []byte) (interface{}, error) {
        var adapters []adapter.AdapterDescription
-       err := json.Unmarshal(data, &adapters)
-       if err != nil {
+
+       dec := json.NewDecoder(strings.NewReader(string(data)))
+       dec.DisallowUnknownFields()
+
+       if err := dec.Decode(&adapters); err != nil && 
!strings.Contains(err.Error(), "unknown field") {
+               log.Fatal(err)
                return nil, err
        }
        return adapters, nil
diff --git 
a/streampipes-client-go/streampipes/model/adapter/adapter_description.go 
b/streampipes-client-go/streampipes/model/adapter/adapter_description.go
index 4b856d32fa..7180442721 100644
--- a/streampipes-client-go/streampipes/model/adapter/adapter_description.go
+++ b/streampipes-client-go/streampipes/model/adapter/adapter_description.go
@@ -20,6 +20,7 @@ package adapter
 import "github.com/apache/streampipes/streampipes-client-go/streampipes/model"
 
 type AdapterDescription struct {
+       Class                            string                                 
`json:"@class"`
        ElementID                        string                                 
`json:"elementId"`
        Rev                              string                                 
`json:"rev"`
        DOM                              string                                 
`json:"dom"`
diff --git a/streampipes-client-go/streampipes/model/common.go 
b/streampipes-client-go/streampipes/model/common.go
index d5a81d684b..b353308556 100644
--- a/streampipes-client-go/streampipes/model/common.go
+++ b/streampipes-client-go/streampipes/model/common.go
@@ -86,6 +86,51 @@ type StaticProperty struct {
        Class              string `json:"@class"`
 }
 
+type MappingProperty struct {
+       StaticProperty
+       RequirementSelector string
+       MapsFromOptions     []string
+       PropertyScope       string
+}
+
+type MappingPropertyUnary struct {
+       MappingProperty
+       SelectedProperty string
+}
+
+type FreeTextStaticProperty struct {
+       StaticProperty
+       value                 string
+       MapsTo                string
+       MultiLine             bool
+       HtmlAllowed           bool
+       HtmlFontFormat        bool
+       PlaceholdersSupported bool
+}
+
+type SelectionStaticProperty struct {
+       HorizontalRendering bool
+}
+
+type OneOfStaticProperty struct {
+       SelectionStaticProperty
+}
+
+type AnyStaticProperty struct {
+       SelectionStaticProperty
+}
+
+type RuntimeResolvableAnyStaticProperty struct {
+       AnyStaticProperty
+       DependsOn []string
+}
+
+type SlideToggleStaticProperty struct {
+       StaticProperty
+       Selected     bool
+       DefaultValue bool
+}
+
 type SpDataStream struct {
        ElementId              string         `json:"elementId"`
        Dom                    string         `json:"dom"`
diff --git a/streampipes-client-go/streampipes/model/pipeline/pipeline.go 
b/streampipes-client-go/streampipes/model/pipeline/pipeline.go
index 0586be659d..65273cd227 100644
--- a/streampipes-client-go/streampipes/model/pipeline/pipeline.go
+++ b/streampipes-client-go/streampipes/model/pipeline/pipeline.go
@@ -30,23 +30,23 @@ const (
 )
 
 type Pipeline struct {
-       Sepas                 []DataProcessorInvocation //`json:"sepas"`
-       Streams               []model.SpDataStream      //`json:"streams"`
-       Name                  string                    //`json:"name"`
-       Description           string                    
//`json:"description,omitempty"`
-       Actions               []DataSinkInvocation      //`json:"actions"`
-       Running               bool                      //`json:"running"`
-       RestartOnSystemReboot bool                      
//`json:"restartOnSystemReboot"`
-       Valid                 bool                      //`json:"valid"`
-       StartedAt             int64                     
//`json:"startedAt,omitempty"`
-       CreatedAt             int64                     //`json:"createdAt"`
-       PublicElement         bool                      //`json:"publicElement"`
-       CreatedByUser         string                    //`json:"createdByUser"`
-       PipelineCategories    []string                  
//`json:"pipelineCategories"`
-       PipelineNotifications []string                  
//`json:"pipelineNotifications"`
-       HealthStatus          PipelineHealthStatus      //`json:"healthStatus"`
-       ID                    string                    //`json:"_id,omitempty"`
-       Rev                   string                    
//`json:"_rev,omitempty"`
+       Sepas                 []DataProcessorInvocation `json:"sepas"`
+       Streams               []model.SpDataStream      `json:"streams"`
+       Name                  string                    `json:"name"`
+       Description           string                    
`json:"description,omitempty"`
+       Actions               []DataSinkInvocation      `json:"actions"`
+       Running               bool                      `json:"running"`
+       RestartOnSystemReboot bool                      
`json:"restartOnSystemReboot"`
+       Valid                 bool                      `json:"valid"`
+       StartedAt             int64                     
`json:"startedAt,omitempty"`
+       CreatedAt             int64                     `json:"createdAt"`
+       PublicElement         bool                      `json:"publicElement"`
+       CreatedByUser         string                    `json:"createdByUser"`
+       PipelineCategories    []string                  
`json:"pipelineCategories"`
+       PipelineNotifications []string                  
`json:"pipelineNotifications"`
+       HealthStatus          PipelineHealthStatus      `json:"healthStatus"`
+       PipelineId            string                    `json:"_id"`
+       Rev                   string                    `json:"_rev"`
 }
 
 type DataProcessorInvocation struct {
diff --git a/streampipes-client-go/streampipes/pipeline_api.go 
b/streampipes-client-go/streampipes/pipeline_api.go
index e58ccc7096..f507134685 100644
--- a/streampipes-client-go/streampipes/pipeline_api.go
+++ b/streampipes-client-go/streampipes/pipeline_api.go
@@ -18,15 +18,14 @@
 package streampipes
 
 import (
-       "fmt"
-       "io"
-       "log"
-       "net/http"
        "github.com/apache/streampipes/streampipes-client-go/streampipes/config"
        
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/serializer"
        
"github.com/apache/streampipes/streampipes-client-go/streampipes/internal/util"
        "github.com/apache/streampipes/streampipes-client-go/streampipes/model"
        
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/pipeline"
+       "io"
+       "log"
+       "net/http"
 )
 
 type Pipeline struct {
@@ -156,13 +155,9 @@ func (p *Pipeline) GetAllPipeline() ([]pipeline.Pipeline, 
error) {
 }
 
 // CreatePipeline store a new pipeline
-func (p *Pipeline) CreatePipeline(pp pipeline.Pipeline) 
(model.ResponseMessage, error) {
+func (p *Pipeline) CreatePipeline(body []byte) (model.ResponseMessage, error) {
        endPointUrl := util.NewStreamPipesApiPath(p.config.Url, 
"streampipes-backend/api/v2/pipelines", nil)
 
-       body, err := serializer.NewPipelineSerializer().Marshal(pp)
-       if err != nil {
-               return model.ResponseMessage{}, err
-       }
        response, err := p.executeRequest("POST", endPointUrl, body)
        if err != nil {
                return model.ResponseMessage{}, err
@@ -180,7 +175,6 @@ func (p *Pipeline) CreatePipeline(pp pipeline.Pipeline) 
(model.ResponseMessage,
 
        unmarshalData, err := 
serializer.NewResponseMessageDeserializer().Unmarshal(data)
        if err != nil {
-               fmt.Println(err, 11)
                return model.ResponseMessage{}, err
        }
        message := unmarshalData.(model.ResponseMessage)

Reply via email to