This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 18dfc84b81 feat(#3483): Use Kraft-based Kafka as default for internal
messaging (#3484)
18dfc84b81 is described below
commit 18dfc84b815959bcd39c0e9ae2ec516591a2f32d
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Feb 20 22:56:48 2025 +0100
feat(#3483): Use Kraft-based Kafka as default for internal messaging (#3484)
* feat(#3483): Use Kraft-based Kafka as default for internal messaging
* Let Kafka consumer wait until rebalancing is finished
* Update helm charts
---
docker-compose.yml | 36 +++----
.../deploy/standalone/kafka/docker-compose.dev.yml | 26 ++---
.../cli/deploy/standalone/kafka/docker-compose.yml | 31 +++---
installer/compose/docker-compose.full.yml | 41 +++-----
installer/compose/docker-compose.quickstart.yml | 39 +++-----
installer/compose/docker-compose.yml | 39 +++-----
installer/k8s/README.md | 16 +--
.../k8s/templates/core/backend-deployment.yaml | 7 --
.../templates/external/kafka/kafka-deployment.yaml | 44 ++++----
.../external/zookeeper/zookeeper-deployment.yaml | 74 --------------
.../external/zookeeper/zookeeper-pvc.yaml | 44 --------
.../external/zookeeper/zookeeper-service.yaml | 29 ------
installer/k8s/values.yaml | 12 ---
.../connectors/kafka/adapter/KafkaProtocol.java | 8 +-
.../messaging/kafka/SpKafkaConsumer.java | 111 +++++++++++----------
.../model/datalake/DataExplorerWidgetModel.java | 2 +
.../model/grounding/KafkaTransportProtocol.java | 40 --------
.../manager/matching/ProtocolSelector.java | 4 +-
.../streampipes/manager/matching/v2/TestUtils.java | 2 +-
.../apache/streampipes/sdk/helpers/Protocols.java | 2 +-
.../src/lib/model/gen/streampipes-model.ts | 85 +---------------
21 files changed, 174 insertions(+), 518 deletions(-)
diff --git a/docker-compose.yml b/docker-compose.yml
index 50f7a71506..e15d439227 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -84,33 +84,26 @@ services:
spnet:
kafka:
- image: fogsyio/kafka:2.2.0
+ image: bitnami/kafka:3.9.0
hostname: kafka
- depends_on:
- - zookeeper
environment:
- # see: https://github.com/confluentinc/schema-registry/issues/648
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
- KAFKA_LISTENERS: PLAINTEXT://:9092
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ - KAFKA_CFG_NODE_ID=0
+ - KAFKA_CFG_PROCESS_ROLES=controller,broker
+ - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ -
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ -
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ -
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
volumes:
- - kafka:/kafka
- - /var/run/docker.sock:/var/run/docker.sock
+ - kafka3:/bitnami
logging: *default-logging
networks:
spnet:
- zookeeper:
- image: fogsyio/zookeeper:3.4.13
- logging: *default-logging
- volumes:
- - zookeeper:/opt/zookeeper-3.4.13
- networks:
- spnet:
-
influxdb:
image: influxdb:2.6
environment:
@@ -144,10 +137,9 @@ services:
spnet:
volumes:
- kafka:
+ kafka3:
files:
couchdb:
- zookeeper:
influxdb:
influxdb2:
backend:
diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
index 2410930d61..8ac1f2e8ba 100644
--- a/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/kafka/docker-compose.dev.yml
@@ -17,16 +17,18 @@ services:
kafka:
ports:
- "9094:9094"
- depends_on:
- - zookeeper
environment:
- # see: https://github.com/confluentinc/schema-registry/issues/648
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 #
Replace localhost with your external address if Kafka should be reachable from
external systems.
- KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES: 5000012
- KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
- KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
+ # KRaft settings
+ - KAFKA_CFG_NODE_ID=0
+ - KAFKA_CFG_PROCESS_ROLES=controller,broker
+ - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ # Listeners
+ -
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ -
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ -
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
+
diff --git a/installer/cli/deploy/standalone/kafka/docker-compose.yml
b/installer/cli/deploy/standalone/kafka/docker-compose.yml
index c3df0c7e0c..a4d689d2e1 100644
--- a/installer/cli/deploy/standalone/kafka/docker-compose.yml
+++ b/installer/cli/deploy/standalone/kafka/docker-compose.yml
@@ -15,24 +15,23 @@
services:
kafka:
- image: fogsyio/kafka:2.2.0
+ image: bitnami/kafka:3.9.0
hostname: kafka
- depends_on:
- - zookeeper
environment:
- # see: https://github.com/confluentinc/schema-registry/issues/648
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
- KAFKA_LISTENERS: PLAINTEXT://:9092
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES: 5000012
- KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
- KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
+ - KAFKA_CFG_NODE_ID=0
+ - KAFKA_CFG_PROCESS_ROLES=controller,broker
+ - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ # Listeners
+ - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
+ - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
+ -
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
+ - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
volumes:
- - kafka:/kafka
- - /var/run/docker.sock:/var/run/docker.sock
+ - kafka3:/bitnami
logging:
driver: "json-file"
options:
@@ -42,7 +41,7 @@ services:
spnet:
volumes:
- kafka:
+ kafka3:
networks:
spnet:
diff --git a/installer/compose/docker-compose.full.yml
b/installer/compose/docker-compose.full.yml
index 2ad26d92cd..47d12fff2e 100644
--- a/installer/compose/docker-compose.full.yml
+++ b/installer/compose/docker-compose.full.yml
@@ -65,32 +65,22 @@ services:
spnet:
kafka:
- image: fogsyio/kafka:2.2.0
+ image: bitnami/kafka:3.9.0
hostname: kafka
- depends_on:
- - zookeeper
environment:
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 #
Replace localhost with your external address if Kafka should be reachable from
external systems.
- KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES: 5000012
- KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
- KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
+ - KAFKA_CFG_NODE_ID=0
+ - KAFKA_CFG_PROCESS_ROLES=controller,broker
+ - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ -
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ -
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ -
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
volumes:
- - kafka:/kafka
- - /var/run/docker.sock:/var/run/docker.sock
- logging: *default-logging
- restart: unless-stopped
- networks:
- spnet:
-
- zookeeper:
- image: fogsyio/zookeeper:3.4.13
- volumes:
- - zookeeper:/opt/zookeeper-3.4.13
+ - kafka3:/bitnami
logging: *default-logging
restart: unless-stopped
networks:
@@ -155,14 +145,11 @@ volumes:
backend:
connect:
couchdb:
- kafka:
- zookeeper:
+ kafka3:
influxdb:
influxdb2:
files:
nginx:
-
-
networks:
spnet:
diff --git a/installer/compose/docker-compose.quickstart.yml
b/installer/compose/docker-compose.quickstart.yml
index 175d931f0d..915c7901eb 100644
--- a/installer/compose/docker-compose.quickstart.yml
+++ b/installer/compose/docker-compose.quickstart.yml
@@ -65,32 +65,22 @@ services:
spnet:
kafka:
- image: fogsyio/kafka:2.2.0
+ image: bitnami/kafka:3.9.0
hostname: kafka
- depends_on:
- - zookeeper
environment:
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 #
Replace localhost with your external address if Kafka should be reachable from
external systems.
- KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES: 5000012
- KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
- KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
- volumes:
- - kafka:/kafka
- - /var/run/docker.sock:/var/run/docker.sock
- logging: *default-logging
- restart: unless-stopped
- networks:
- spnet:
-
- zookeeper:
- image: fogsyio/zookeeper:3.4.13
+ - KAFKA_CFG_NODE_ID=0
+ - KAFKA_CFG_PROCESS_ROLES=controller,broker
+ - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ -
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ -
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ -
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
volumes:
- - zookeeper:/opt/zookeeper-3.4.13
+ - kafka3:/bitnami
logging: *default-logging
restart: unless-stopped
networks:
@@ -141,8 +131,7 @@ volumes:
backend:
connect:
couchdb:
- kafka:
- zookeeper:
+ kafka3:
influxdb:
influxdb2:
files:
diff --git a/installer/compose/docker-compose.yml
b/installer/compose/docker-compose.yml
index 6157972476..5c8a9e3c77 100644
--- a/installer/compose/docker-compose.yml
+++ b/installer/compose/docker-compose.yml
@@ -65,32 +65,22 @@ services:
spnet:
kafka:
- image: fogsyio/kafka:2.2.0
+ image: bitnami/kafka:3.9.0
hostname: kafka
- depends_on:
- - zookeeper
environment:
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092,OUTSIDE://localhost:9094 #
Replace localhost with your external address if Kafka should be reachable from
external systems.
- KAFKA_LISTENERS: PLAINTEXT://:9092,OUTSIDE://:9094
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES: 5000012
- KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
- KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
- volumes:
- - kafka:/kafka
- - /var/run/docker.sock:/var/run/docker.sock
- logging: *default-logging
- restart: unless-stopped
- networks:
- spnet:
-
- zookeeper:
- image: fogsyio/zookeeper:3.4.13
+ - KAFKA_CFG_NODE_ID=0
+ - KAFKA_CFG_PROCESS_ROLES=controller,broker
+ - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
+ -
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,OUTSIDE://:9094
+ -
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,OUTSIDE://localhost:9094
+ -
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
+ - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
+ - KAFKA_CFG_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5000012
+ - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10000000
+ - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
volumes:
- - zookeeper:/opt/zookeeper-3.4.13
+ - kafka3:/bitnami
logging: *default-logging
restart: unless-stopped
networks:
@@ -131,8 +121,7 @@ volumes:
backend:
connect:
couchdb:
- kafka:
- zookeeper:
+ kafka3:
influxdb:
influxdb2:
files:
diff --git a/installer/k8s/README.md b/installer/k8s/README.md
index a093c40d11..7693cd2667 100644
--- a/installer/k8s/README.md
+++ b/installer/k8s/README.md
@@ -225,7 +225,7 @@ rm -rf ${HOME}/streampipes-k8s
| Parameter Name | Description
| Value |
|---------------------------------------------|------------------------------------------------------------------------------------------|-------------|
| external.kafka.appName | Kafka application name
| "kafka" |
-| external.kafka.version | Kafka version
| 2.2.0 |
+| external.kafka.version | Kafka version
| 3.9.0 |
| external.kafka.port | Port for the Kafka service
| 9092 |
| external.kafka.external.hostname | Name which will be advertised
to external clients. Clients which use (default) port 9094 | "localhost" |
| external.kafka.service.name | Name of the Kafka service
| "kafka" |
@@ -237,20 +237,6 @@ rm -rf ${HOME}/streampipes-k8s
| external.kafka.persistence.pvName | Name of the Kafka
PersistentVolume |
"kafka-pv" |
|
-#### Zookeeper common parameters
-
-| Parameter Name | Description
| Value |
-|-------------------------------------------------|---------------------------------------------|-----------------|
-| external.zookeeper.appName | ZooKeeper application name
| "zookeeper" |
-| external.zookeeper.version | ZooKeeper version
| 3.4.13 |
-| external.zookeeper.port | Port for the ZooKeeper
service | 2181 |
-| external.zookeeper.service.name | Name of the ZooKeeper
service | "zookeeper" |
-| external.zookeeper.service.port | TargetPort of the
ZooKeeper service | 2181 |
-| external.zookeeper.persistence.storageClassName | Storage class name for
ZooKeeper PVs | "hostpath" |
-| external.zookeeper.persistence.storageSize | Size of the ZooKeeper PV
| "1Gi" |
-| external.zookeeper.persistence.claimName | Name of the ZooKeeper
PersistentVolumeClaim | "zookeeper-pvc" |
-| external.zookeeper.persistence.pvName | Name of the ZooKeeper
PersistentVolume | "zookeeper-pv" |
-
#### Pulsar common parameters
| Parameter Name | Description
| Value |
diff --git a/installer/k8s/templates/core/backend-deployment.yaml
b/installer/k8s/templates/core/backend-deployment.yaml
index 96d09ee9c1..8103e20ac6 100644
--- a/installer/k8s/templates/core/backend-deployment.yaml
+++ b/installer/k8s/templates/core/backend-deployment.yaml
@@ -59,13 +59,6 @@ spec:
value: "{{ .Values.external.kafka.service.name }}"
- name: SP_KAFKA_PORT
value: "{{ .Values.external.kafka.service.port }}"
- - name: SP_ZOOKEEPER_HOST
- value: "{{ .Values.external.zookeeper.service.name }}"
- - name: SP_ZOOKEEPER_PORT
- value: "{{ .Values.external.zookeeper.service.port }}"
- {{- end }}
- {{- if eq .Values.preferredBroker "nats" }}
- value: "nats"
- name: SP_NATS_HOST
value: "{{ .Values.external.nats.service.name }}"
- name: SP_NATS_PORT
diff --git a/installer/k8s/templates/external/kafka/kafka-deployment.yaml
b/installer/k8s/templates/external/kafka/kafka-deployment.yaml
index 76ac96f049..659d56d148 100644
--- a/installer/k8s/templates/external/kafka/kafka-deployment.yaml
+++ b/installer/k8s/templates/external/kafka/kafka-deployment.yaml
@@ -29,17 +29,13 @@ spec:
app: {{ .Values.external.kafka.appName }}
spec:
restartPolicy: {{ .Values.restartPolicy }}
- initContainers:
- - name: init-wait
- image: alpine
- command: ["sh", "-c", "for i in $(seq 1 300); do nc -zvw1 {{
.Values.external.zookeeper.service.name }} {{
.Values.external.zookeeper.service.port }} && exit 0 || sleep 3; done; exit 1"]
volumes:
- name: {{ .Values.external.kafka.persistence.pvName }}
persistentVolumeClaim:
claimName: {{ .Values.external.kafka.persistence.claimName }}
containers:
- name: {{ .Values.external.kafka.appName }}
- image: fogsyio/kafka:{{ .Values.external.kafka.version }}
+ image: bitnami/kafka:{{ .Values.external.kafka.version }}
imagePullPolicy: {{ .Values.pullPolicy }}
ports:
- containerPort: {{ .Values.external.kafka.port }}
@@ -47,25 +43,29 @@ spec:
- mountPath: "/kafka"
name: {{ .Values.external.kafka.persistence.pvName }}
env:
- # Known issue with kafka running in kubernetes:
- # https://github.com/wurstmeister/kafka-docker/issues/122
- - name: KAFKA_PORT
- value: "{{ .Values.external.kafka.port }}"
- - name: KAFKA_ZOOKEEPER_CONNECT
- value: "zookeeper:{{ .Values.external.zookeeper.port }}"
- - name: KAFKA_LISTENERS
- value: "PLAINTEXT://:{{ .Values.external.kafka.port
}},OUTSIDE://:9094"
- - name: KAFKA_ADVERTISED_LISTENERS
- value: "PLAINTEXT://kafka:{{ .Values.external.kafka.port
}},OUTSIDE://{{ .Values.external.kafka.external.hostname }}:9094"
- - name: KAFKA_INTER_BROKER_LISTENER_NAME
+ - name: KAFKA_CFG_NODE_ID
+ value: "0"
+ - name: KAFKA_CFG_PROCESS_ROLES
+ value: "controller,broker"
+ - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
+ value: "0@kafka:9093"
+ - name: KAFKA_CFG_LISTENERS
+ value: "PLAINTEXT://:{{ .Values.external.kafka.port
}},CONTROLLER://:9093,OUTSIDE://:9094"
+ - name: KAFKA_CFG_ADVERTISED_LISTENERS
+ value: "PLAINTEXT://{{ .Values.external.kafka.port
}}:9092,OUTSIDE://{{ .Values.external.kafka.external.hostname }}:9094"
+ - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
+ value:
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT"
+ - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
value: "PLAINTEXT"
- - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
- value: "PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT"
- - name: KAFKA_MESSAGE_MAX_BYTES
+ - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
+ value: "CONTROLLER"
+ - name: KAFKA_CFG_PORT
+ value: "{{ .Values.external.kafka.port }}"
+ - name: KAFKA_CFG_MESSAGE_MAX_BYTES
value: "5000012"
- - name: KAFKA_FETCH_MESSAGE_MAX_BYTES
+ - name: KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES
value: "5000012"
- - name: KAFKA_REPLICA_FETCH_MAX_BYTES
+ - name: KAFKA_CFG_REPLICA_FETCH_MAX_BYTES
value: "10000000"
livenessProbe:
tcpSocket:
@@ -85,4 +85,4 @@ spec:
initialDelaySeconds: {{ .Values.initialDelaySeconds }}
periodSeconds: {{ .Values.periodSeconds }}
failureThreshold: {{ .Values.failureThreshold }}
-{{- end }}
\ No newline at end of file
+{{- end }}
diff --git
a/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml
b/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml
deleted file mode 100644
index 07314cdeb3..0000000000
--- a/installer/k8s/templates/external/zookeeper/zookeeper-deployment.yaml
+++ /dev/null
@@ -1,74 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{{- if eq .Values.preferredBroker "kafka" }}
-apiVersion: apps/v1
-kind: Deployment
-metadata:
- name: {{ .Values.external.zookeeper.appName }}
-spec:
- selector:
- matchLabels:
- app: {{ .Values.external.zookeeper.appName }}
- replicas: 1
- template:
- metadata:
- labels:
- app: {{ .Values.external.zookeeper.appName }}
- spec:
- restartPolicy: {{ .Values.restartPolicy }}
- volumes:
- - name: {{ .Values.external.zookeeper.persistence.pvName }}
- persistentVolumeClaim:
- claimName: {{ .Values.external.zookeeper.persistence.claimName }}
- containers:
- #TODO: wurstmeister/zookeeper:latest is running ZK 3.4.13. Once this
- # changes, the mount path needs to be adapted
- - name: {{ .Values.external.zookeeper.appName }}
- image: fogsyio/zookeeper:{{ .Values.external.zookeeper.version }}
- imagePullPolicy: {{ .Values.pullPolicy }}
- ports:
- - containerPort: {{ .Values.external.zookeeper.port }}
- volumeMounts:
- - mountPath: "/opt/zookeeper-{{ .Values.external.zookeeper.version
}}/data"
- name: {{ .Values.external.zookeeper.persistence.pvName }}
- livenessProbe:
- exec:
- command:
- - sh
- - -c
- - echo ruok | nc localhost {{ .Values.external.zookeeper.port
}}
- initialDelaySeconds: {{ .Values.initialDelaySeconds }}
- periodSeconds: {{ .Values.periodSeconds }}
- failureThreshold: {{ .Values.failureThreshold }}
- readinessProbe:
- exec:
- command:
- - sh
- - -c
- - echo ruok | nc localhost {{ .Values.external.zookeeper.port
}}
- initialDelaySeconds: {{ .Values.initialDelaySeconds }}
- periodSeconds: {{ .Values.periodSeconds }}
- failureThreshold: {{ .Values.failureThreshold }}
- startupProbe:
- exec:
- command:
- - sh
- - -c
- - echo ruok | nc localhost {{ .Values.external.zookeeper.port
}}
- initialDelaySeconds: {{ .Values.initialDelaySeconds }}
- periodSeconds: {{ .Values.periodSeconds }}
- failureThreshold: {{ .Values.failureThreshold }}
-{{- end }}
\ No newline at end of file
diff --git a/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml
b/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml
deleted file mode 100644
index 465963d6e8..0000000000
--- a/installer/k8s/templates/external/zookeeper/zookeeper-pvc.yaml
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{{- if eq .Values.preferredBroker "kafka" }}
-apiVersion: v1
-kind: PersistentVolume
-metadata:
- name: {{ .Values.external.zookeeper.persistence.pvName }}
-spec:
- storageClassName: {{ .Values.external.zookeeper.persistence.storageClassName
}}
- capacity:
- storage: {{ .Values.external.zookeeper.persistence.storageSize }}
- accessModes:
- - {{ .Values.persistentVolumeAccessModes }}
- persistentVolumeReclaimPolicy: {{ .Values.persistentVolumeReclaimPolicy }}
- hostPath:
- path: {{ .Values.hostPath }}/zookeeper
----
-apiVersion: v1
-kind: PersistentVolumeClaim
-metadata:
- labels:
- app: {{ .Values.external.zookeeper.appName }}
- name: {{ .Values.external.zookeeper.persistence.claimName }}
-spec:
- storageClassName: {{ .Values.external.zookeeper.persistence.storageClassName
}}
- accessModes:
- - {{ .Values.persistentVolumeAccessModes }}
- resources:
- requests:
- storage: {{ .Values.external.zookeeper.persistence.storageSize }}
-{{- end }}
\ No newline at end of file
diff --git a/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml
b/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml
deleted file mode 100644
index 492d0558ea..0000000000
--- a/installer/k8s/templates/external/zookeeper/zookeeper-service.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{{- if eq .Values.preferredBroker "kafka" }}
-apiVersion: v1
-kind: Service
-metadata:
- name: {{ .Values.external.zookeeper.service.name }}
-spec:
- selector:
- app: {{ .Values.external.zookeeper.appName }}
- ports:
- - name: main
- protocol: TCP
- port: {{ .Values.external.zookeeper.port }}
- targetPort: {{ .Values.external.zookeeper.port }}
-{{- end }}
\ No newline at end of file
diff --git a/installer/k8s/values.yaml b/installer/k8s/values.yaml
index 68b7e2e95c..e6601bf64d 100644
--- a/installer/k8s/values.yaml
+++ b/installer/k8s/values.yaml
@@ -152,18 +152,6 @@ external:
storageSize: "1Gi"
claimName: "kafka-pvc"
pvName: "kafka-pv"
- zookeeper:
- appName: "zookeeper"
- version: 3.4.13
- port: 2181
- service:
- name: "zookeeper"
- port: 2181
- persistence:
- storageClassName: "hostpath"
- storageSize: "1Gi"
- claimName: "zookeeper-pvc"
- pvName: "zookeeper-pv"
pulsar:
appName: "pulsar"
version: 3.0.0
diff --git
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
index 0e00e94b01..ce10fcb759 100644
---
a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
+++
b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java
@@ -78,7 +78,6 @@ public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig
public static final String ID =
"org.apache.streampipes.connect.iiot.protocol.stream.kafka";
- private Thread thread;
private SpKafkaConsumer kafkaConsumer;
public KafkaProtocol() {
@@ -190,13 +189,9 @@ public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig
protocol.setTopicDefinition(new SimpleTopicDefinition(config.getTopic()));
this.kafkaConsumer = new SpKafkaConsumer(protocol,
- config.getTopic(),
- new BrokerEventProcessor(extractor.selectedParser(), collector),
config.getConfigAppenders()
);
-
- thread = new Thread(this.kafkaConsumer);
- thread.start();
+ this.kafkaConsumer.connect(new
BrokerEventProcessor(extractor.selectedParser(), collector));
}
@Override
@@ -215,7 +210,6 @@ public class KafkaProtocol implements StreamPipesAdapter,
SupportsRuntimeConfig
}
LOG.info("Kafka Adapter was sucessfully stopped");
- thread.interrupt();
}
@Override
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index 6b8f1bbc23..8413452765 100644
---
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -41,6 +41,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class SpKafkaConsumer implements EventConsumer, Runnable,
@@ -50,56 +52,25 @@ public class SpKafkaConsumer implements EventConsumer,
Runnable,
private InternalEventProcessor<byte[]> eventProcessor;
private final KafkaTransportProtocol protocol;
private volatile boolean isRunning;
- private Boolean patternTopic = false;
private List<KafkaConfigAppender> appenders = new ArrayList<>();
+ private KafkaConsumer<byte[], byte[]> consumer;
private static final Logger LOG =
LoggerFactory.getLogger(SpKafkaConsumer.class);
public SpKafkaConsumer(KafkaTransportProtocol protocol) {
this.protocol = protocol;
+ this.topic = protocol.getTopicDefinition().getActualTopicName();
}
public SpKafkaConsumer(KafkaTransportProtocol protocol,
- String topic,
- InternalEventProcessor<byte[]> eventProcessor) {
- this.protocol = protocol;
- this.topic = topic;
- this.eventProcessor = eventProcessor;
- this.isRunning = true;
- }
-
- public SpKafkaConsumer(KafkaTransportProtocol protocol,
- String topic,
- InternalEventProcessor<byte[]> eventProcessor,
List<KafkaConfigAppender> appenders) {
- this(protocol, topic, eventProcessor);
+ this(protocol);
this.appenders = appenders;
}
@Override
public void run() {
-
- Properties props = makeProperties(protocol, appenders);
-
- LOG.info("Using kafka properties: {}", props.toString());
- KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
- if (!patternTopic) {
- consumer.subscribe(Collections.singletonList(topic));
- } else {
- topic = replaceWildcardWithPatternFormat(topic);
- consumer.subscribe(Pattern.compile(topic), new
ConsumerRebalanceListener() {
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
- // TODO
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
- // TODO
- }
- });
- }
Duration duration = Duration.of(100, ChronoUnit.MILLIS);
while (isRunning) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
@@ -108,40 +79,76 @@ public class SpKafkaConsumer implements EventConsumer,
Runnable,
consumer.close();
}
- private String replaceWildcardWithPatternFormat(String topic) {
- topic = topic.replaceAll("\\.", "\\\\.");
- return topic.replaceAll("\\*", ".*");
- }
-
- private Properties makeProperties(KafkaTransportProtocol protocol,
- List<KafkaConfigAppender> appenders) {
- return new ConsumerConfigFactory(protocol).buildProperties(appenders);
- }
-
@Override
public void connect(InternalEventProcessor<byte[]> eventProcessor) throws
SpRuntimeException {
- LOG.info("Kafka consumer: Connecting to " +
protocol.getTopicDefinition().getActualTopicName());
- if (protocol.getTopicDefinition() instanceof WildcardTopicDefinition) {
- this.patternTopic = true;
- }
+ LOG.info("Kafka consumer: Connecting to {}",
protocol.getTopicDefinition().getActualTopicName());
+ var patternTopic = isPatternTopic();
this.eventProcessor = eventProcessor;
-
- this.topic = protocol.getTopicDefinition().getActualTopicName();
this.isRunning = true;
+ Properties props = makeProperties(protocol, appenders);
+
+ consumer = new KafkaConsumer<>(props);
+ var latch = new CountDownLatch(1);
+ if (!patternTopic) {
+ consumer.subscribe(Collections.singletonList(topic), new
RebalanceListener(latch));
+ } else {
+ topic = replaceWildcardWithPatternFormat(topic);
+ consumer.subscribe(Pattern.compile(topic), new RebalanceListener(latch));
+ }
Thread thread = new Thread(this);
thread.start();
+ try {
+ if (!latch.await(10, TimeUnit.SECONDS)) {
+ throw new SpRuntimeException("Timeout while waiting for partition
assignment");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SpRuntimeException("Interrupted while waiting for partition
assignment", e);
+ }
}
@Override
public void disconnect() throws SpRuntimeException {
- LOG.info("Kafka consumer: Disconnecting from " + topic);
+ LOG.info("Kafka consumer: Disconnecting from {}", topic);
this.isRunning = false;
-
}
@Override
public boolean isConnected() {
return isRunning;
}
+
+ private boolean isPatternTopic() {
+ return this.protocol.getTopicDefinition() instanceof
WildcardTopicDefinition;
+ }
+
+ private String replaceWildcardWithPatternFormat(String topic) {
+ topic = topic.replaceAll("\\.", "\\\\.");
+ return topic.replaceAll("\\*", ".*");
+ }
+
+ private Properties makeProperties(KafkaTransportProtocol protocol,
+ List<KafkaConfigAppender> appenders) {
+ return new ConsumerConfigFactory(protocol).buildProperties(appenders);
+ }
+
+ private class RebalanceListener implements ConsumerRebalanceListener {
+
+ private final CountDownLatch latch;
+ public RebalanceListener(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> collection) {
+ consumer.pause(collection);
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ consumer.resume(partitions);
+ latch.countDown();
+ }
+ }
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java
index 02b244d22c..8e302ae163 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataExplorerWidgetModel.java
@@ -19,12 +19,14 @@
package org.apache.streampipes.model.datalake;
import org.apache.streampipes.model.dashboard.DashboardEntity;
+import org.apache.streampipes.model.shared.annotation.TsModel;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.HashMap;
import java.util.Map;
+@TsModel
public class DataExplorerWidgetModel extends DashboardEntity {
private String widgetId;
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
index 9fe5228abe..8af85bdd56 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/grounding/KafkaTransportProtocol.java
@@ -20,12 +20,6 @@ package org.apache.streampipes.model.grounding;
public class KafkaTransportProtocol extends TransportProtocol {
- private static final long serialVersionUID = -4067982203807146257L;
-
- private String zookeeperHost;
-
- private int zookeeperPort;
-
private int kafkaPort;
private Integer lingerMs;
@@ -44,24 +38,12 @@ public class KafkaTransportProtocol extends
TransportProtocol {
public KafkaTransportProtocol(String kafkaHost, int kafkaPort, String topic)
{
super(kafkaHost, new SimpleTopicDefinition(topic));
- this.zookeeperHost = kafkaHost;
- this.zookeeperPort = kafkaPort;
- this.kafkaPort = kafkaPort;
- }
-
- public KafkaTransportProtocol(String kafkaHost, int kafkaPort, String topic,
String zookeeperHost,
- int zookeeperPort) {
- super(kafkaHost, new SimpleTopicDefinition(topic));
- this.zookeeperHost = zookeeperHost;
- this.zookeeperPort = zookeeperPort;
this.kafkaPort = kafkaPort;
}
public KafkaTransportProtocol(KafkaTransportProtocol other) {
super(other);
this.kafkaPort = other.getKafkaPort();
- this.zookeeperHost = other.getZookeeperHost();
- this.zookeeperPort = other.getZookeeperPort();
this.acks = other.getAcks();
this.batchSize = other.getBatchSize();
this.groupId = other.getGroupId();
@@ -74,34 +56,12 @@ public class KafkaTransportProtocol extends
TransportProtocol {
public KafkaTransportProtocol(String kafkaHost, Integer kafkaPort,
WildcardTopicDefinition wildcardTopicDefinition) {
super(kafkaHost, wildcardTopicDefinition);
this.kafkaPort = kafkaPort;
- this.zookeeperHost = kafkaHost;
- this.zookeeperPort = kafkaPort;
}
public KafkaTransportProtocol() {
super();
}
- public static long getSerialVersionUID() {
- return serialVersionUID;
- }
-
- public String getZookeeperHost() {
- return zookeeperHost;
- }
-
- public void setZookeeperHost(String zookeeperHost) {
- this.zookeeperHost = zookeeperHost;
- }
-
- public int getZookeeperPort() {
- return zookeeperPort;
- }
-
- public void setZookeeperPort(int zookeeperPort) {
- this.zookeeperPort = zookeeperPort;
- }
-
public int getKafkaPort() {
return kafkaPort;
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
index e1eb685f6d..e1eeb4f58f 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/ProtocolSelector.java
@@ -114,9 +114,7 @@ public class ProtocolSelector extends GroundingSelector {
return new KafkaTransportProtocol(
messagingSettings.getKafkaHost(),
messagingSettings.getKafkaPort(),
- outputTopic,
- messagingSettings.getZookeeperHost(),
- messagingSettings.getZookeeperPort()
+ outputTopic
);
}
diff --git
a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
index 78d291a369..ddfee95cdc 100644
---
a/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
+++
b/streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/matching/v2/TestUtils.java
@@ -25,7 +25,7 @@ import
org.apache.streampipes.model.grounding.TransportProtocol;
public class TestUtils {
public static TransportProtocol kafkaProtocol() {
- return new KafkaTransportProtocol("localhost", 9092, "abc", "localhost",
2181);
+ return new KafkaTransportProtocol("localhost", 9092, "abc");
}
public static TransportProtocol jmsProtocol() {
diff --git
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java
index f9c25f11c8..f20e15f40f 100644
---
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java
+++
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Protocols.java
@@ -36,7 +36,7 @@ public class Protocols {
* containing URL and topic where data arrives.
*/
public static KafkaTransportProtocol kafka(String kafkaHost, Integer
kafkaPort, String topic) {
- return new KafkaTransportProtocol(kafkaHost, kafkaPort, topic, kafkaHost,
kafkaPort);
+ return new KafkaTransportProtocol(kafkaHost, kafkaPort, topic);
}
/**
diff --git
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index 31f14e6b19..03013586f6 100644
---
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -20,7 +20,7 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 3.2.1263 on 2025-01-25
23:02:05.
+// Generated using typescript-generator version 3.2.1263 on 2025-02-14
21:48:17.
export class NamedStreamPipesEntity implements Storable {
'@class':
@@ -1188,61 +1188,6 @@ export class DashboardModel implements Storable {
}
}
-export class DashboardWidgetModel extends DashboardEntity {
- dashboardWidgetSettings: DashboardWidgetSettings;
- pipelineId: string;
- visualizationName: string;
- widgetId: string;
- widgetType: string;
-
- static fromData(
- data: DashboardWidgetModel,
- target?: DashboardWidgetModel,
- ): DashboardWidgetModel {
- if (!data) {
- return data;
- }
- const instance = target || new DashboardWidgetModel();
- super.fromData(data, instance);
- instance.dashboardWidgetSettings = DashboardWidgetSettings.fromData(
- data.dashboardWidgetSettings,
- );
- instance.pipelineId = data.pipelineId;
- instance.visualizationName = data.visualizationName;
- instance.widgetId = data.widgetId;
- instance.widgetType = data.widgetType;
- return instance;
- }
-}
-
-export class DashboardWidgetSettings {
- config: StaticPropertyUnion[];
- requiredSchema: EventSchema;
- widgetDescription: string;
- widgetIconName: string;
- widgetLabel: string;
- widgetName: string;
-
- static fromData(
- data: DashboardWidgetSettings,
- target?: DashboardWidgetSettings,
- ): DashboardWidgetSettings {
- if (!data) {
- return data;
- }
- const instance = target || new DashboardWidgetSettings();
- instance.config = __getCopyArrayFn(StaticProperty.fromDataUnion)(
- data.config,
- );
- instance.requiredSchema = EventSchema.fromData(data.requiredSchema);
- instance.widgetDescription = data.widgetDescription;
- instance.widgetIconName = data.widgetIconName;
- instance.widgetLabel = data.widgetLabel;
- instance.widgetName = data.widgetName;
- return instance;
- }
-}
-
export class DataExplorerWidgetModel extends DashboardEntity {
baseAppearanceConfig: { [index: string]: any };
dataConfig: { [index: string]: any };
@@ -2208,8 +2153,6 @@ export class KafkaTransportProtocol extends
TransportProtocol {
'maxRequestSize': string;
'messageMaxBytes': string;
'offset': string;
- 'zookeeperHost': string;
- 'zookeeperPort': number;
static 'fromData'(
data: KafkaTransportProtocol,
@@ -2228,8 +2171,6 @@ export class KafkaTransportProtocol extends
TransportProtocol {
instance.maxRequestSize = data.maxRequestSize;
instance.messageMaxBytes = data.messageMaxBytes;
instance.offset = data.offset;
- instance.zookeeperHost = data.zookeeperHost;
- instance.zookeeperPort = data.zookeeperPort;
return instance;
}
}
@@ -4139,30 +4080,6 @@ export class UserInfo {
}
}
-export class VisualizablePipeline {
- pipelineId: string;
- pipelineName: string;
- schema: EventSchema;
- topic: string;
- visualizationName: string;
-
- static fromData(
- data: VisualizablePipeline,
- target?: VisualizablePipeline,
- ): VisualizablePipeline {
- if (!data) {
- return data;
- }
- const instance = target || new VisualizablePipeline();
- instance.pipelineId = data.pipelineId;
- instance.pipelineName = data.pipelineName;
- instance.schema = EventSchema.fromData(data.schema);
- instance.topic = data.topic;
- instance.visualizationName = data.visualizationName;
- return instance;
- }
-}
-
export class WildcardTopicDefinition extends TopicDefinition {
'@class': 'org.apache.streampipes.model.grounding.WildcardTopicDefinition';
'wildcardTopicMappings': WildcardTopicMapping[];