This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 18dfc84b81 feat(#3483): Use Kraft-based Kafka as default for internal 
messaging (#3484)
18dfc84b81 is described below

commit 18dfc84b815959bcd39c0e9ae2ec516591a2f32d
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Feb 20 22:56:48 2025 +0100

    feat(#3483): Use Kraft-based Kafka as default for internal messaging (#3484)
    
    * feat(#3483): Use Kraft-based Kafka as default for internal messaging
    
    * Let Kafka consumer wait until rebalancing is finished
    
    * Update helm charts
---
 docker-compose.yml                                 |  36 +++----
 .../deploy/standalone/kafka/docker-compose.dev.yml |  26 ++---
 .../cli/deploy/standalone/kafka/docker-compose.yml |  31 +++---
 installer/compose/docker-compose.full.yml          |  41 +++-----
 installer/compose/docker-compose.quickstart.yml    |  39 +++-----
 installer/compose/docker-compose.yml               |  39 +++-----
 installer/k8s/README.md                            |  16 +--
 .../k8s/templates/core/backend-deployment.yaml     |   7 --
 .../templates/external/kafka/kafka-deployment.yaml |  44 ++++----
 .../external/zookeeper/zookeeper-deployment.yaml   |  74 --------------
 .../external/zookeeper/zookeeper-pvc.yaml          |  44 --------
 .../external/zookeeper/zookeeper-service.yaml      |  29 ------
 installer/k8s/values.yaml                          |  12 ---
 .../connectors/kafka/adapter/KafkaProtocol.java    |   8 +-
 .../messaging/kafka/SpKafkaConsumer.java           | 111 +++++++++++----------
 .../model/datalake/DataExplorerWidgetModel.java    |   2 +
 .../model/grounding/KafkaTransportProtocol.java    |  40 --------
 .../manager/matching/ProtocolSelector.java         |   4 +-
 .../streampipes/manager/matching/v2/TestUtils.java |   2 +-
 .../apache/streampipes/sdk/helpers/Protocols.java  |   2 +-
 .../src/lib/model/gen/streampipes-model.ts         |  85 +---------------
 21 files changed, 174 insertions(+), 518 deletions(-)

diff --git a/docker-compose.yml b/docker-compose.yml
index 50f7a71506..e15d439227 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -84,33 +84,26 @@ services:
       spnet:
 
   kafka:
-    image: fogsyio/kafka:2.2.0
+    image: bitnami/kafka:3.9.0
     hostname: kafka
-    depends_on:
-      - zookeeper
     environment:
-      # see: https://github.com/confluentinc/schema-registry/issues/648
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
-      KAFKA_LISTENERS: PLAINTEXT://:9092
-      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_ADVERTISED_HOST_NAME: kafka
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      - KAFKA_CFG_NODE_ID=0
+      - KAFKA_CFG_PROCESS_ROLES=controller,broker
+      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+      - 
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+      - 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+      - 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+      - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
     volumes:
-      - kafka:/kafka
-      - /var/run/docker.sock:/var/run/docker.sock
+      - kafka3:/bitnami
     logging: *default-logging
     networks:
       spnet:
 
-  zookeeper:
-    image: fogsyio/zookeeper:3.4.13
-    logging: *default-logging
-    volumes:
-      - zookeeper:/opt/zookeeper-3.4.13
-    networks:
-      spnet:
-
   influxdb:
     image: influxdb:2.6
     environment:
@@ -144,10 +137,9 @@ services:
       spnet:
 
 volumes:
-  kafka:
+  kafka3:
   files:
   couchdb:
-  zookeeper:
   influxdb:
   influxdb2:
   backend:
diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml 
b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
index 2410930d61..8ac1f2e8ba 100644
--- a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
@@ -17,16 +17,18 @@ services:
   kafka:
     ports:
       - "9094:9094"
-    depends_on:
-      - zookeeper
     environment:
-      # see: https://github.com/confluentinc/schema-registry/issues/648
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 # 
 Replace localhost with your external address if Kafka should be reachable from 
external systems.
-      KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094
-      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_ADVERTISED_HOST_NAME: kafka
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-      KAFKA_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
+      # KRaft settings
+      - KAFKA_CFG_NODE_ID=0
+      - KAFKA_CFG_PROCESS_ROLES=controller,broker
+      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+      # Listeners
+      - 
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+      - 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+      - 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+      - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+
diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.yml 
b/installer/cli/deploy/standalone/kafka/docker-compose.yml
index c3df0c7e0c..a4d689d2e1 100644
--- a/installer/cli/deploy/standalone/kafka/docker-compose.yml
+++ b/installer/cli/deploy/standalone/kafka/docker-compose.yml
@@ -15,24 +15,23 @@
 
 services:
   kafka:
-    image: fogsyio/kafka:2.2.0
+    image: bitnami/kafka:3.9.0
     hostname: kafka
-    depends_on:
-      - zookeeper
     environment:
-      # see: https://github.com/confluentinc/schema-registry/issues/648
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
-      KAFKA_LISTENERS: PLAINTEXT://:9092
-      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_ADVERTISED_HOST_NAME: kafka
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-      KAFKA_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
+      - KAFKA_CFG_NODE_ID=0
+      - KAFKA_CFG_PROCESS_ROLES=controller,broker
+      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+      # Listeners
+      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
+      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
+      - 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
+      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+      - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
     volumes:
-      - kafka:/kafka
-      - /var/run/docker.sock:/var/run/docker.sock
+      - kafka3:/bitnami
     logging:
       driver: "json-file"
       options:
@@ -42,7 +41,7 @@ services:
       spnet:
 
 volumes:
-  kafka:
+  kafka3:
 
 networks:
   spnet:
diff --git a/installer/compose/docker-compose.full.yml 
b/installer/compose/docker-compose.full.yml
index 2ad26d92cd..47d12fff2e 100644
--- a/installer/compose/docker-compose.full.yml
+++ b/installer/compose/docker-compose.full.yml
@@ -65,32 +65,22 @@ services:
       spnet:
 
   kafka:
-    image: fogsyio/kafka:2.2.0
+    image: bitnami/kafka:3.9.0
     hostname: kafka
-    depends_on:
-      - zookeeper
     environment:
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 # 
 Replace localhost with your external address if Kafka should be reachable from 
external systems.
-      KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094
-      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_ADVERTISED_HOST_NAME: kafka
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-      KAFKA_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
+      - KAFKA_CFG_NODE_ID=0
+      - KAFKA_CFG_PROCESS_ROLES=controller,broker
+      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+      - 
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+      - 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+      - 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+      - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
     volumes:
-      - kafka:/kafka
-      - /var/run/docker.sock:/var/run/docker.sock
-    logging: *default-logging
-    restart: unless-stopped
-    networks:
-      spnet:
-
-  zookeeper:
-    image: fogsyio/zookeeper:3.4.13
-    volumes:
-      - zookeeper:/opt/zookeeper-3.4.13
+      - kafka3:/bitnami
     logging: *default-logging
     restart: unless-stopped
     networks:
@@ -155,14 +145,11 @@ volumes:
   backend:
   connect:
   couchdb:
-  kafka:
-  zookeeper:
+  kafka3:
   influxdb:
   influxdb2:
   files:
   nginx:
-  
-
 
 networks:
   spnet:
diff --git a/installer/compose/docker-compose.quickstart.yml 
b/installer/compose/docker-compose.quickstart.yml
index 175d931f0d..915c7901eb 100644
--- a/installer/compose/docker-compose.quickstart.yml
+++ b/installer/compose/docker-compose.quickstart.yml
@@ -65,32 +65,22 @@ services:
       spnet:
 
   kafka:
-    image: fogsyio/kafka:2.2.0
+    image: bitnami/kafka:3.9.0
     hostname: kafka
-    depends_on:
-      - zookeeper
     environment:
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 # 
 Replace localhost with your external address if Kafka should be reachable from 
external systems.
-      KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094
-      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_ADVERTISED_HOST_NAME: kafka
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-      KAFKA_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
-    volumes:
-      - kafka:/kafka
-      - /var/run/docker.sock:/var/run/docker.sock
-    logging: *default-logging
-    restart: unless-stopped
-    networks:
-      spnet:
-
-  zookeeper:
-    image: fogsyio/zookeeper:3.4.13
+      - KAFKA_CFG_NODE_ID=0
+      - KAFKA_CFG_PROCESS_ROLES=controller,broker
+      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+      - 
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+      - 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+      - 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+      - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
     volumes:
-      - zookeeper:/opt/zookeeper-3.4.13
+      - kafka3:/bitnami
     logging: *default-logging
     restart: unless-stopped
     networks:
@@ -141,8 +131,7 @@ volumes:
   backend:
   connect:
   couchdb:
-  kafka:
-  zookeeper:
+  kafka3:
   influxdb:
   influxdb2:
   files:
diff --git a/installer/compose/docker-compose.yml 
b/installer/compose/docker-compose.yml
index 6157972476..5c8a9e3c77 100644
--- a/installer/compose/docker-compose.yml
+++ b/installer/compose/docker-compose.yml
@@ -65,32 +65,22 @@ services:
       spnet:
 
   kafka:
-    image: fogsyio/kafka:2.2.0
+    image: bitnami/kafka:3.9.0
     hostname: kafka
-    depends_on:
-      - zookeeper
     environment:
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 # 
 Replace localhost with your external address if Kafka should be reachable from 
external systems.
-      KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094
-      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_ADVERTISED_HOST_NAME: kafka
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-      KAFKA_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
-    volumes:
-      - kafka:/kafka
-      - /var/run/docker.sock:/var/run/docker.sock
-    logging: *default-logging
-    restart: unless-stopped
-    networks:
-      spnet:
-
-  zookeeper:
-    image: fogsyio/zookeeper:3.4.13
+      - KAFKA_CFG_NODE_ID=0
+      - KAFKA_CFG_PROCESS_ROLES=controller,broker
+      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+      - 
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+      - 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+      - 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+      - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
     volumes:
-      - zookeeper:/opt/zookeeper-3.4.13
+      - kafka3:/bitnami
     logging: *default-logging
     restart: unless-stopped
     networks:
@@ -131,8 +121,7 @@ volumes:
   backend:
   connect:
   couchdb:
-  kafka:
-  zookeeper:
+  kafka3:
   influxdb:
   influxdb2:
   files:
diff --git a/installer/k8s/README.md b/installer/k8s/README.md
index a093c40d11..7693cd2667 100644
--- a/installer/k8s/README.md
+++ b/installer/k8s/README.md
@@ -225,7 +225,7 @@ rm -rf ${HOME}/streampipes-k8s
 | Parameter Name                              | Description                    
                                                          | Value       |
 
|---------------------------------------------|------------------------------------------------------------------------------------------|-------------|
 | external.kafka.appName                      | Kafka application name         
                                                          | "kafka"     |
-| external.kafka.version                      | Kafka version                  
                                                          | 2.2.0       |
+| external.kafka.version                      | Kafka version                  
                                                          | 3.9.0       |
 | external.kafka.port                         | Port for the Kafka service     
                                                          | 9092        |
 | external.kafka.external.hostname            | Name which will be advertised 
to external clients. Clients which use (default) port 9094 | "localhost" |
 | external.kafka.service.name                 | Name of the Kafka service      
                                                          | "kafka"     |
@@ -237,20 +237,6 @@ rm -rf ${HOME}/streampipes-k8s
 | external.kafka.persistence.pvName           | Name of the Kafka 
PersistentVolume                                                       | 
"kafka-pv"  |
 |
 
-#### Zookeeper common parameters
-
-| Parameter Name                                  | Description                
                 | Value           |
-|-------------------------------------------------|---------------------------------------------|-----------------|
-| external.zookeeper.appName                      | ZooKeeper application name 
                 | "zookeeper"     |
-| external.zookeeper.version                      | ZooKeeper version          
                 | 3.4.13          |
-| external.zookeeper.port                         | Port for the ZooKeeper 
service              | 2181            |
-| external.zookeeper.service.name                 | Name of the ZooKeeper 
service               | "zookeeper"     |
-| external.zookeeper.service.port                 | TargetPort of the 
ZooKeeper service         | 2181            |
-| external.zookeeper.persistence.storageClassName | Storage class name for 
ZooKeeper PVs        | "hostpath"      |
-| external.zookeeper.persistence.storageSize      | Size of the ZooKeeper PV   
                 | "1Gi"           |
-| external.zookeeper.persistence.claimName        | Name of the ZooKeeper 
PersistentVolumeClaim | "zookeeper-pvc" |
-| external.zookeeper.persistence.pvName           | Name of the ZooKeeper 
PersistentVolume      | "zookeeper-pv"  |
-
 #### Pulsar common parameters
 
 | Parameter Name                               | Description                   
           | Value        |
diff --git a/installer/k8s/templates/core/backend-deployment.yaml 
b/installer/k8s/templates/core/backend-deployment.yaml
index 96d09ee9c1..8103e20ac6 100644
--- a/installer/k8s/templates/core/backend-deployment.yaml
+++ b/installer/k8s/templates/core/backend-deployment.yaml
@@ -59,13 +59,6 @@ spec:
               value: "{{ .Values.external.kafka.service.name }}"
             - name: SP_KAFKA_PORT
               value: "{{ .Values.external.kafka.service.port }}"
-            - name: SP_ZOOKEEPER_HOST
-              value: "{{ .Values.external.zookeeper.service.name }}"
-            - name: SP_ZOOKEEPER_PORT
-              value: "{{ .Values.external.zookeeper.service.port }}"
-            {{- end }}
-            {{- if eq .Values.preferredBroker "nats" }}
-              value: "nats"
             - name: SP_NATS_HOST
               value: "{{ .Values.external.nats.service.name }}"
             - name: SP_NATS_PORT
diff --git a/installer/k8s/templates/external/kafka/kafka-deployment.yaml 
b/installer/k8s/templates/external/kafka/kafka-deployment.yaml
index 76ac96f049..659d56d148 100644
--- a/installer/k8s/templates/external/kafka/kafka-deployment.yaml
+++ b/installer/k8s/templates/external/kafka/kafka-deployment.yaml
@@ -29,17 +29,13 @@ spec:
         app: {{ .Values.external.kafka.appName }}
     spec:
       restartPolicy: {{ .Values.restartPolicy }}
-      initContainers:
-        - name: init-wait
-          image: alpine
-          command: ["sh", "-c", "for i in $(seq 1 300); do nc -zvw1 {{ 
.Values.external.zookeeper.service.name }} {{ 
.Values.external.zookeeper.service.port }} && exit 0 || sleep 3; done; exit 1"]
       volumes:
         - name: {{ .Values.external.kafka.persistence.pvName }}
           persistentVolumeClaim:
             claimName: {{ .Values.external.kafka.persistence.claimName }}
       containers:
         - name: {{ .Values.external.kafka.appName }}
-          image: fogsyio/kafka:{{ .Values.external.kafka.version }}
+          image: bitnami/kafka:{{ .Values.external.kafka.version }}
           imagePullPolicy: {{ .Values.pullPolicy }}
           ports:
             - containerPort: {{ .Values.external.kafka.port }}
@@ -47,25 +43,29 @@ spec:
             - mountPath: "/kafka"
               name: {{ .Values.external.kafka.persistence.pvName }}
           env:
-            # Known issue with kafka running in kubernetes:
-            # https://github.com/wurstmeister/kafka-docker/issues/122
-            - name: KAFKA_PORT
-              value: "{{ .Values.external.kafka.port }}"
-            - name: KAFKA_ZOOKEEPER_CONNECT
-              value: "zookeeper:{{ .Values.external.zookeeper.port }}"
-            - name: KAFKA_LISTENERS
-              value: "PLAINTEXT://:{{ .Values.external.kafka.port 
}},OUTSIDE://:9094"
-            - name: KAFKA_ADVERTISED_LISTENERS
-              value: "PLAINTEXT://kafka:{{ .Values.external.kafka.port 
}},OUTSIDE://{{ .Values.external.kafka.external.hostname }}:9094"
-            - name: KAFKA_INTER_BROKER_LISTENER_NAME
+            - name: KAFKA_CFG_NODE_ID
+              value: "0"
+            - name: KAFKA_CFG_PROCESS_ROLES
+              value: "controller,broker"
+            - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
+              value: "0@kafka:9093"
+            - name: KAFKA_CFG_LISTENERS
+              value: "PLAINTEXT://:{{ .Values.external.kafka.port 
}},CONTROLLER://:9093,OUTSIDE://:9094"
+            - name: KAFKA_CFG_ADVERTISED_LISTENERS
+              value: "PLAINTEXT://{{ .Values.external.kafka.port 
}}:9092,OUTSIDE://{{ .Values.external.kafka.external.hostname }}:9094"
+            - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
+              value: 
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT"
+            - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
               value: "PLAINTEXT"
-            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
-              value: "PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT"
-            - name: KAFKA_MESSAGE_MAX_BYTES
+            - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
+              value: "CONTROLLER"
+            - name: KAFKA_CFG_PORT
+              value: "{{ .Values.external.kafka.port }}"
+            - name: KAFKA_CFG_MESSAGE_MAX_BYTES
               value: "5000012"
-            - name: KAFKA_FETCH_MESSAGE_MAX_BYTES
+            - name: KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES
               value: "5000012"
-            - name: KAFKA_REPLICA_FETCH_MAX_BYTES
+            - name: KAFKA_CFG_REPLICA_FETCH_MAX_BYTES
               value: "10000000"
           livenessProbe:
             tcpSocket:
@@ -85,4 +85,4 @@ spec:
             initialDelaySeconds: {{ .Values.initialDelaySeconds }}
             periodSeconds: {{ .Values.periodSeconds }}
             failureThreshold: {{ .Values.failureThreshold }}
-{{- end }}
\ No newline at end of file
+{{- end }}
diff --git 
a/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml 
b/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml
deleted file mode 100644
index 07314cdeb3..0000000000
--- a/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml
+++ /dev/null
@@ -1,74 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{{- if eq .Values.preferredBroker "kafka" }}
-apiVersion: apps/v1
-kind: Deployment
-metadata:
-  name: {{ .Values.external.zookeeper.appName }}
-spec:
-  selector:
-    matchLabels:
-      app: {{ .Values.external.zookeeper.appName }}
-  replicas: 1
-  template:
-    metadata:
-      labels:
-        app: {{ .Values.external.zookeeper.appName }}
-    spec:
-      restartPolicy: {{ .Values.restartPolicy }}
-      volumes:
-        - name: {{ .Values.external.zookeeper.persistence.pvName }}
-          persistentVolumeClaim:
-            claimName: {{ .Values.external.zookeeper.persistence.claimName }}
-      containers:
-        #TODO: wurstmeister/zookeeper:latest is running ZK 3.4.13. Once this
-        #      changes, the mount path needs to be adapted
-        - name: {{ .Values.external.zookeeper.appName }}
-          image: fogsyio/zookeeper:{{ .Values.external.zookeeper.version }}
-          imagePullPolicy: {{ .Values.pullPolicy }}
-          ports:
-            - containerPort: {{ .Values.external.zookeeper.port }}
-          volumeMounts:
-            - mountPath: "/opt/zookeeper-{{ .Values.external.zookeeper.version 
}}/data"
-              name: {{ .Values.external.zookeeper.persistence.pvName }}
-          livenessProbe:
-            exec:
-              command:
-                - sh
-                - -c
-                - echo ruok | nc localhost {{ .Values.external.zookeeper.port 
}}
-            initialDelaySeconds: {{ .Values.initialDelaySeconds }}
-            periodSeconds: {{ .Values.periodSeconds }}
-            failureThreshold: {{ .Values.failureThreshold }}
-          readinessProbe:
-            exec:
-              command:
-                - sh
-                - -c
-                - echo ruok | nc localhost {{ .Values.external.zookeeper.port 
}}
-            initialDelaySeconds: {{ .Values.initialDelaySeconds }}
-            periodSeconds: {{ .Values.periodSeconds }}
-            failureThreshold: {{ .Values.failureThreshold }}
-          startupProbe:
-            exec:
-              command:
-                - sh
-                - -c
-                - echo ruok | nc localhost {{ .Values.external.zookeeper.port 
}}
-            initialDelaySeconds: {{ .Values.initialDelaySeconds }}
-            periodSeconds: {{ .Values.periodSeconds }}
-            failureThreshold: {{ .Values.failureThreshold }}
-{{- end }}
\ No newline at end of file
diff --git a/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml 
b/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml
deleted file mode 100644
index 465963d6e8..0000000000
--- a/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{{- if eq .Values.preferredBroker "kafka" }}
-apiVersion: v1
-kind: PersistentVolume
-metadata:
-  name: {{ .Values.external.zookeeper.persistence.pvName }}
-spec:
-  storageClassName: {{ .Values.external.zookeeper.persistence.storageClassName 
}}
-  capacity:
-    storage: {{ .Values.external.zookeeper.persistence.storageSize }}
-  accessModes:
-    - {{ .Values.persistentVolumeAccessModes }}
-  persistentVolumeReclaimPolicy: {{ .Values.persistentVolumeReclaimPolicy }}
-  hostPath:
-    path: {{ .Values.hostPath }}/zookeeper
----
-apiVersion: v1
-kind: PersistentVolumeClaim
-metadata:
-  labels:
-    app: {{ .Values.external.zookeeper.appName }}
-  name: {{ .Values.external.zookeeper.persistence.claimName }}
-spec:
-  storageClassName: {{ .Values.external.zookeeper.persistence.storageClassName 
}}
-  accessModes:
-    - {{ .Values.persistentVolumeAccessModes }}
-  resources:
-    requests:
-      storage: {{ .Values.external.zookeeper.persistence.storageSize }}
-{{- end }}
\ No newline at end of file
diff --git a/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml 
b/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml
deleted file mode 100644
index 492d0558ea..0000000000
--- a/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{{- if eq .Values.preferredBroker "kafka" }}
-apiVersion: v1
-kind: Service
-metadata:
-  name: {{ .Values.external.zookeeper.service.name }}
-spec:
-  selector:
-    app: {{ .Values.external.zookeeper.appName }}
-  ports:
-    - name: main
-      protocol: TCP
-      port: {{ .Values.external.zookeeper.port }}
-      targetPort: {{ .Values.external.zookeeper.port }}
-{{- end }}
\ No newline at end of file
diff --git a/installer/k8s/values.yaml b/installer/k8s/values.yaml
index 68b7e2e95c..e6601bf64d 100644
--- a/installer/k8s/values.yaml
+++ b/installer/k8s/values.yaml
@@ -152,18 +152,6 @@ external:
       storageSize: "1Gi"
       claimName: "kafka-pvc"
       pvName: "kafka-pv"
-  zookeeper:
-    appName: "zookeeper"
-    version: 3.4.13
-    port: 2181
-    service:
-      name: "zookeeper"
-      port: 2181
-    persistence:
-      storageClassName: "hostpath"
-      storageSize: "1Gi"
-      claimName: "zookeeper-pvc"
-      pvName: "zookeeper-pv"
   pulsar:
     appName: "pulsar"
     version: 3.0.0
diff --git 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
index 0e00e94b01..ce10fcb759 100644
--- 
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
+++ 
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
@@ -78,7 +78,6 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
 
   public static final String ID = 
"org.apache.streampipes.connect.iiot.protocol.stream.kafka";
 
-  private Thread thread;
   private SpKafkaConsumer kafkaConsumer;
 
   public KafkaProtocol() {
@@ -190,13 +189,9 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
     protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic()));
 
     this.kafkaConsumer = new SpKafkaConsumer(protocol,
-        config.getTopic(),
-        new BrokerEventProcessor(extractor.selectedParser(), collector),
         config.getConfigAppenders()
     );
-
-    thread = new Thread(this.kafkaConsumer);
-    thread.start();
+    this.kafkaConsumer.connect(new 
BrokerEventProcessor(extractor.selectedParser(), collector));
   }
 
   @Override
@@ -215,7 +210,6 @@ public class KafkaProtocol implements StreamPipesAdapter, 
SupportsRuntimeConfig
     }
 
     LOG.info("Kafka Adapter was sucessfully stopped");
-    thread.interrupt();
   }
 
   @Override
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index 6b8f1bbc23..8413452765 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -41,6 +41,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 public class SpKafkaConsumer implements EventConsumer, Runnable,
@@ -50,56 +52,25 @@ public class SpKafkaConsumer implements EventConsumer, 
Runnable,
   private InternalEventProcessor<byte[]> eventProcessor;
   private final KafkaTransportProtocol protocol;
   private volatile boolean isRunning;
-  private Boolean patternTopic = false;
 
   private List<KafkaConfigAppender> appenders = new ArrayList<>();
+  private KafkaConsumer<byte[], byte[]> consumer;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SpKafkaConsumer.class);
 
   public SpKafkaConsumer(KafkaTransportProtocol protocol) {
     this.protocol = protocol;
+    this.topic = protocol.getTopicDefinition().getActualTopicName();
   }
 
   public SpKafkaConsumer(KafkaTransportProtocol protocol,
-                         String topic,
-                         InternalEventProcessor<byte[]> eventProcessor) {
-    this.protocol = protocol;
-    this.topic = topic;
-    this.eventProcessor = eventProcessor;
-    this.isRunning = true;
-  }
-
-  public SpKafkaConsumer(KafkaTransportProtocol protocol,
-                         String topic,
-                         InternalEventProcessor<byte[]> eventProcessor,
                          List<KafkaConfigAppender> appenders) {
-    this(protocol, topic, eventProcessor);
+    this(protocol);
     this.appenders = appenders;
   }
 
   @Override
   public void run() {
-
-    Properties props = makeProperties(protocol, appenders);
-
-    LOG.info("Using kafka properties: {}", props.toString());
-    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
-    if (!patternTopic) {
-      consumer.subscribe(Collections.singletonList(topic));
-    } else {
-      topic = replaceWildcardWithPatternFormat(topic);
-      consumer.subscribe(Pattern.compile(topic), new 
ConsumerRebalanceListener() {
-        @Override
-        public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
-          // TODO
-        }
-
-        @Override
-        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
-          // TODO
-        }
-      });
-    }
     Duration duration = Duration.of(100, ChronoUnit.MILLIS);
     while (isRunning) {
       ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
@@ -108,40 +79,76 @@ public class SpKafkaConsumer implements EventConsumer, 
Runnable,
     consumer.close();
   }
 
-  private String replaceWildcardWithPatternFormat(String topic) {
-    topic = topic.replaceAll("\\.", "\\\\.");
-    return topic.replaceAll("\\*", ".*");
-  }
-
-  private Properties makeProperties(KafkaTransportProtocol protocol,
-                                    List<KafkaConfigAppender> appenders) {
-    return new ConsumerConfigFactory(protocol).buildProperties(appenders);
-  }
-
   @Override
   public void connect(InternalEventProcessor<byte[]> eventProcessor) throws 
SpRuntimeException {
-    LOG.info("Kafka consumer: Connecting to " + 
protocol.getTopicDefinition().getActualTopicName());
-    if (protocol.getTopicDefinition() instanceof WildcardTopicDefinition) {
-      this.patternTopic = true;
-    }
+    LOG.info("Kafka consumer: Connecting to {}", 
protocol.getTopicDefinition().getActualTopicName());
+    var patternTopic = isPatternTopic();
     this.eventProcessor = eventProcessor;
-
-    this.topic = protocol.getTopicDefinition().getActualTopicName();
     this.isRunning = true;
+    Properties props = makeProperties(protocol, appenders);
+
+    consumer = new KafkaConsumer<>(props);
+    var latch = new CountDownLatch(1);
+    if (!patternTopic) {
+      consumer.subscribe(Collections.singletonList(topic), new 
RebalanceListener(latch));
+    } else {
+      topic = replaceWildcardWithPatternFormat(topic);
+      consumer.subscribe(Pattern.compile(topic), new RebalanceListener(latch));
+    }
 
     Thread thread = new Thread(this);
     thread.start();
+    try {
+      if (!latch.await(10, TimeUnit.SECONDS)) {
+        throw new SpRuntimeException("Timeout while waiting for partition 
assignment");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SpRuntimeException("Interrupted while waiting for partition 
assignment", e);
+    }
   }
 
   @Override
   public void disconnect() throws SpRuntimeException {
-    LOG.info("Kafka consumer: Disconnecting from " + topic);
+    LOG.info("Kafka consumer: Disconnecting from {}", topic);
     this.isRunning = false;
-
   }
 
   @Override
   public boolean isConnected() {
     return isRunning;
   }
+
+  private boolean isPatternTopic() {
+    return this.protocol.getTopicDefinition() instanceof 
WildcardTopicDefinition;
+  }
+
+  private String replaceWildcardWithPatternFormat(String topic) {
+    topic = topic.replaceAll("\\.", "\\\\.");
+    return topic.replaceAll("\\*", ".*");
+  }
+
+  private Properties makeProperties(KafkaTransportProtocol protocol,
+                                    List<KafkaConfigAppender> appenders) {
+    return new ConsumerConfigFactory(protocol).buildProperties(appenders);
+  }
+
+  private class RebalanceListener implements ConsumerRebalanceListener {
+
+    private final CountDownLatch latch;
+    public RebalanceListener(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
+      consumer.pause(collection);
+    }
+
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+      consumer.resume(partitions);
+      latch.countDown();
+    }
+  }
 }
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java
index 02b244d22c..8e302ae163 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java
@@ -19,12 +19,14 @@
 package org.apache.streampipes.model.datalake;
 
 import org.apache.streampipes.model.dashboard.DashboardEntity;
+import org.apache.streampipes.model.shared.annotation.TsModel;
 
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import java.util.HashMap;
 import java.util.Map;
 
+@TsModel
 public class DataExplorerWidgetModel extends DashboardEntity {
 
   private String widgetId;
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
index 9fe5228abe..8af85bdd56 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
@@ -20,12 +20,6 @@ package org.apache.streampipes.model.grounding;
 
 public class KafkaTransportProtocol extends TransportProtocol {
 
-  private static final long serialVersionUID = -4067982203807146257L;
-
-  private String zookeeperHost;
-
-  private int zookeeperPort;
-
   private int kafkaPort;
 
   private Integer lingerMs;
@@ -44,24 +38,12 @@ public class KafkaTransportProtocol extends 
TransportProtocol {
 
   public KafkaTransportProtocol(String kafkaHost, int kafkaPort, String topic) 
{
     super(kafkaHost, new SimpleTopicDefinition(topic));
-    this.zookeeperHost = kafkaHost;
-    this.zookeeperPort = kafkaPort;
-    this.kafkaPort = kafkaPort;
-  }
-
-  public KafkaTransportProtocol(String kafkaHost, int kafkaPort, String topic, 
String zookeeperHost,
-                                int zookeeperPort) {
-    super(kafkaHost, new SimpleTopicDefinition(topic));
-    this.zookeeperHost = zookeeperHost;
-    this.zookeeperPort = zookeeperPort;
     this.kafkaPort = kafkaPort;
   }
 
   public KafkaTransportProtocol(KafkaTransportProtocol other) {
     super(other);
     this.kafkaPort = other.getKafkaPort();
-    this.zookeeperHost = other.getZookeeperHost();
-    this.zookeeperPort = other.getZookeeperPort();
     this.acks = other.getAcks();
     this.batchSize = other.getBatchSize();
     this.groupId = other.getGroupId();
@@ -74,34 +56,12 @@ public class KafkaTransportProtocol extends 
TransportProtocol {
   public KafkaTransportProtocol(String kafkaHost, Integer kafkaPort, 
WildcardTopicDefinition wildcardTopicDefinition) {
     super(kafkaHost, wildcardTopicDefinition);
     this.kafkaPort = kafkaPort;
-    this.zookeeperHost = kafkaHost;
-    this.zookeeperPort = kafkaPort;
   }
 
   public KafkaTransportProtocol() {
     super();
   }
 
-  public static long getSerialVersionUID() {
-    return serialVersionUID;
-  }
-
-  public String getZookeeperHost() {
-    return zookeeperHost;
-  }
-
-  public void setZookeeperHost(String zookeeperHost) {
-    this.zookeeperHost = zookeeperHost;
-  }
-
-  public int getZookeeperPort() {
-    return zookeeperPort;
-  }
-
-  public void setZookeeperPort(int zookeeperPort) {
-    this.zookeeperPort = zookeeperPort;
-  }
-
   public int getKafkaPort() {
     return kafkaPort;
   }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index e1eb685f6d..e1eeb4f58f 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -114,9 +114,7 @@ public class ProtocolSelector extends GroundingSelector {
     return new KafkaTransportProtocol(
         messagingSettings.getKafkaHost(),
         messagingSettings.getKafkaPort(),
-        outputTopic,
-        messagingSettings.getZookeeperHost(),
-        messagingSettings.getZookeeperPort()
+        outputTopic
     );
   }
 
diff --git 
a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
 
b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
index 78d291a369..ddfee95cdc 100644
--- 
a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
+++ 
b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
@@ -25,7 +25,7 @@ import 
org.apache.streampipes.model.grounding.TransportProtocol;
 public class TestUtils {
 
   public static TransportProtocol kafkaProtocol() {
-    return new KafkaTransportProtocol("localhost", 9092, "abc", "localhost", 
2181);
+    return new KafkaTransportProtocol("localhost", 9092, "abc");
   }
 
   public static TransportProtocol jmsProtocol() {
diff --git 
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java
 
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java
index f9c25f11c8..f20e15f40f 100644
--- 
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java
+++ 
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java
@@ -36,7 +36,7 @@ public class Protocols {
    * containing URL and topic where data arrives.
    */
   public static KafkaTransportProtocol kafka(String kafkaHost, Integer 
kafkaPort, String topic) {
-    return new KafkaTransportProtocol(kafkaHost, kafkaPort, topic, kafkaHost, 
kafkaPort);
+    return new KafkaTransportProtocol(kafkaHost, kafkaPort, topic);
   }
 
   /**
diff --git 
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
 
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index 31f14e6b19..03013586f6 100644
--- 
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++ 
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -20,7 +20,7 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 3.2.1263 on 2025-01-25 
23:02:05.
+// Generated using typescript-generator version 3.2.1263 on 2025-02-14 
21:48:17.
 
 export class NamedStreamPipesEntity implements Storable {
     '@class':
@@ -1188,61 +1188,6 @@ export class DashboardModel implements Storable {
     }
 }
 
-export class DashboardWidgetModel extends DashboardEntity {
-    dashboardWidgetSettings: DashboardWidgetSettings;
-    pipelineId: string;
-    visualizationName: string;
-    widgetId: string;
-    widgetType: string;
-
-    static fromData(
-        data: DashboardWidgetModel,
-        target?: DashboardWidgetModel,
-    ): DashboardWidgetModel {
-        if (!data) {
-            return data;
-        }
-        const instance = target || new DashboardWidgetModel();
-        super.fromData(data, instance);
-        instance.dashboardWidgetSettings = DashboardWidgetSettings.fromData(
-            data.dashboardWidgetSettings,
-        );
-        instance.pipelineId = data.pipelineId;
-        instance.visualizationName = data.visualizationName;
-        instance.widgetId = data.widgetId;
-        instance.widgetType = data.widgetType;
-        return instance;
-    }
-}
-
-export class DashboardWidgetSettings {
-    config: StaticPropertyUnion[];
-    requiredSchema: EventSchema;
-    widgetDescription: string;
-    widgetIconName: string;
-    widgetLabel: string;
-    widgetName: string;
-
-    static fromData(
-        data: DashboardWidgetSettings,
-        target?: DashboardWidgetSettings,
-    ): DashboardWidgetSettings {
-        if (!data) {
-            return data;
-        }
-        const instance = target || new DashboardWidgetSettings();
-        instance.config = __getCopyArrayFn(StaticProperty.fromDataUnion)(
-            data.config,
-        );
-        instance.requiredSchema = EventSchema.fromData(data.requiredSchema);
-        instance.widgetDescription = data.widgetDescription;
-        instance.widgetIconName = data.widgetIconName;
-        instance.widgetLabel = data.widgetLabel;
-        instance.widgetName = data.widgetName;
-        return instance;
-    }
-}
-
 export class DataExplorerWidgetModel extends DashboardEntity {
     baseAppearanceConfig: { [index: string]: any };
     dataConfig: { [index: string]: any };
@@ -2208,8 +2153,6 @@ export class KafkaTransportProtocol extends 
TransportProtocol {
     'maxRequestSize': string;
     'messageMaxBytes': string;
     'offset': string;
-    'zookeeperHost': string;
-    'zookeeperPort': number;
 
     static 'fromData'(
         data: KafkaTransportProtocol,
@@ -2228,8 +2171,6 @@ export class KafkaTransportProtocol extends 
TransportProtocol {
         instance.maxRequestSize = data.maxRequestSize;
         instance.messageMaxBytes = data.messageMaxBytes;
         instance.offset = data.offset;
-        instance.zookeeperHost = data.zookeeperHost;
-        instance.zookeeperPort = data.zookeeperPort;
         return instance;
     }
 }
@@ -4139,30 +4080,6 @@ export class UserInfo {
     }
 }
 
-export class VisualizablePipeline {
-    pipelineId: string;
-    pipelineName: string;
-    schema: EventSchema;
-    topic: string;
-    visualizationName: string;
-
-    static fromData(
-        data: VisualizablePipeline,
-        target?: VisualizablePipeline,
-    ): VisualizablePipeline {
-        if (!data) {
-            return data;
-        }
-        const instance = target || new VisualizablePipeline();
-        instance.pipelineId = data.pipelineId;
-        instance.pipelineName = data.pipelineName;
-        instance.schema = EventSchema.fromData(data.schema);
-        instance.topic = data.topic;
-        instance.visualizationName = data.visualizationName;
-        return instance;
-    }
-}
-
 export class WildcardTopicDefinition extends TopicDefinition {
     '@class': 'org.apache.streampipes.model.grounding.WildcardTopicDefinition';
     'wildcardTopicMappings': WildcardTopicMapping[];


Reply via email to