intigration commented on issue #4140:
URL: https://github.com/apache/streampipes/issues/4140#issuecomment-3851297146
### **1. Kafka Configuration**
Broker Configuration
**kafka:** (docker-compose.kafka.yaml)
```
image: wurstmeister/kafka
container_name: kafka environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 0
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://:29092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
- ports:
- "29092:29092" # External access
- "9092:9092" # Internal access
```
**Topic**: opc-ae-events
2. Raw JSON Message Example
Based on the OPC Event data being published, each Kafka message has this
structure:
```{ "EventID": "", "AssetPath": "Tunnel_2.OPC_2.GOL_E_GOHAR_II_SRV",
"AssetName": "HP31-BF310/NAI_LIM", "TagName": "HP31-BF310", "TagDescription":
"HP31-BF310/NAI_LIM", "BlockName": "Limit Value Alarm high", "VT_Start":
"2025-03-17 17:29:42", "EventName": "ReturnToNormal", "IntervalIdentifier":
"Alarm high", "Parameter": "1231", "Priority": "1", "FromValue": "15.0",
"ToValue": "1100.0", "Limit": "8.0", "Value": "1.0", "Console": "USER=",
"UnitOfMeasure": "", "Operator": "", "Message": "COMMENT=,Flow Filter cake
Supply Alarm high >1200.000000", "CloseAlarm": "1", "CloseAllAlarms": "0",
"Suppressed": "No", "Shelved": "1", "system_inputname": "",
"SupressCategory": "", "ShelvedReason": "", "system_messagetypename": "Alarm
high", "Visibility": "1", "SourceEventID": "671126554",
"ProcessChildAssets": "0", "ReceivedDelay": "", "ProcessComment": "IE
VENT SUBCONDITION NAME=Alarm high,TEXT05=,TEXT06=,TEXT07=,TEXT08=,TEXT09=",
"Custom1": "65538", "Custom2": "['HP31-BF310/NAI_LIM', 'Mixing Area']",
"Custom3": "['Mixing Area', ['FromValue', 'ToValue', 'Limit', 'Value']]",
"Custom4": "['Mixing Area', ['FromValue', 'ToValue', 'Limit', 'Value']]",
"Custom5": "['3/17/2025 5:29:42 PM']", "Custom6": "", "Custom7": "",
"Custom8": "", "Custom9": "", "Custom10": "", "Filter1": "", "Filter2": "",
"Filter3": "", "Filter4": "", "Filter5": "", "CreatedBy": "System",
"CreatedDate": "2026-01-31 04:48:55.951780", "EventModifiedBy": "System",
"EventModifiedTimeStamp": "2026-01-31 04:48:55.951780", "system_spare1": "",
"Company": "Tunnel_2", "Plant": "OPC_2", "Area": "Mixing Area",
"published_timestamp": 1738756312000, "record_index": 1, "source":
"csv_export"}```
**3. Schema Consistency**
✅ Single, Consistent Schema: All events in the opc-ae-events topic follow
the same structure with 54 fields from the CSV plus 3 metadata fields added by
the publisher:
CSV Fields (54):
`EventID, AssetPath, AssetName, TagName, TagDescription, BlockName,
VT_Start, EventName, IntervalIdentifier, Parameter, Priority, FromValue,
ToValue, Limit, Value, Console, UnitOfMeasure, Operator, Message, CloseAlarm,
CloseAllAlarms, Suppressed, Shelved, system_inputname, SupressCategory,
ShelvedReason, system_messagetypename, Visibility, SourceEventID,
ProcessChildAssets, ReceivedDelay, ProcessComment, Custom1-10, Filter1-5,
CreatedBy, CreatedDate, EventModifiedBy, EventModifiedTimeStamp, system_spare1,
Company, Plant, Area`
**Publisher Metadata :**
- published_timestamp: Unix timestamp in milliseconds
- record_index: Sequential record number
- source: Always "csv_export"
4. Publisher Configuration
publishes to:
- Kafka Topic: opc-ae-events
- NATS Subject: events.alarms
- Rate: ~9.3 records/second
- Max Records: 1000
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]