amanchaudhary-95 opened a new issue, #9992: URL: https://github.com/apache/seatunnel/issues/9992
### Search before asking - [x] I had searched in the [feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description I'm trying to stream data from a Kafka consumer to IoTDB. The data is IoT sensor data, and below is the message from the Kafka broker that I'm streaming from Kafka to IoTDB: ```{"mass_flow":2.208682858804355,"density":41.27306775668481,"volume_flow":2.7837518158137557,"sensor_fault_flags":true,"temperature":84.57554188821447,"timestamp":1761550505272,"sensorId":"flowrate02"}``` I've below configuration that is working: ``` env { parallelism = 2 job.mode = "STREAMING" checkpoint.interval = 2000 } source{ Kafka { # Kafka bootstrap servers bootstrap.servers = "localhost:9094" # Topic topic = "sp_iotdb_buffer" # Consumer group consumer.group.id = "seatunnel_iotdb_group" # Start position: earliest | latest start_mode = "earliest" # Message format - json without explicit schema format = "json" schema = { fields = { mass_flow = "double" density = "double" volume_flow = "double" sensor_fault_flags = "boolean" temperature = "double" timestamp = "bigint" sensorId = "string" } } # Output table name plugin_output = "kafka" } } sink { IoTDB { # Read directly from Kafka source table plugin_input = "kafka" # IoTDB connection and mapping node_urls = ["localhost:6667"] username = "root" password = "root" key_device = "device_id" ##value of sensorId will be used as device id key_timestamp = "timestamp" # event time column (ms epoch) timestamp_precision = "ms" # Device/time mapping storage_group = "root.test" # will become root.test.<sensorId> # Optional tuning aligned = false batch_size = 1024 retry = 3 } } ``` The above config works fine and sends data to IoTDB. The issue is that there are thousands of sensors, and it may not be possible to provide the `schema` for all the sensors. Although `schema` is an optional parameter in the Kafka source connector but if I remove it, JSON cannot auto-decode it and treats it as a raw string. Also, IoTDB expects `key_device` and `timestamp` as inputs. The Kafka source connector should be able to auto-detect the schema from the messages. As of now, even if I put ``` schema = { fields = {} } ``` It will remove all the fields from the message. The `scheme` parameter should be made truly optional in the sense that it should be used only to define those fields that JSON can't auto-detect or detects incorrectly. PS: I tried other formats such as `COMPATIBLE_KAFKA_CONNECT_JSON`, `debezium_json`, `canal_json` and `avro`. None of it works. ### Usage Scenario It will make IoT sensor data streaming more accessible using Kafka, as the `seatunnel` doesn't have a MQTT connector for industrial IoT data streaming. ### Related issues **I'm not sure, but this may be useful.** I used [`telegraf`](https://github.com/influxdata/telegraf/tree/v1.36.3) and I was able to send data from Kafka to IoTDB. `Telegraf` was able to auto parse the JSON schema with its `json_v2`. It uses `GJSON`. This is the config I used in `telegraf`: ``` # Telegraf Configuration for Kafka to IoTDB Pipeline # This configuration reads data from Kafka broker and writes to IoTDB # Global tags can be specified here in key="value" format. # Configuration for telegraf agent [agent] ## Default data collection interval for all inputs interval = "1s" ## Rounds collection interval to 'interval' round_interval = true ## Telegraf will send metrics to outputs in batches of at most ## metric_batch_size metrics. For streaming, use small batch size metric_batch_size = 10 ## Maximum number of unwritten metrics per output. ## Keep this low for streaming to avoid buffering metric_buffer_limit = 1000 ## Collection jitter is used to jitter the collection by a random amount. collection_jitter = "0s" ## Default flushing interval for all outputs. ## For streaming, flush immediately or very frequently flush_interval = "1s" ## Override default hostname, if empty use os.Hostname() hostname = "" ## If set to true, do no set the "host" tag in the telegraf agent. omit_hostname = true ## Enable debug mode for detailed logging debug = true ## Log only error level messages quiet = true ############################################################################### # INPUT PLUGINS # ############################################################################### # Read metrics from Kafka topics [[inputs.kafka_consumer]] ## Kafka brokers - specify your Kafka broker addresses brokers = ["localhost:9094"] ## Topics to consume - add all topics you want to consume from topics = ["sp_iotdb_buffer"] ## Optional topic pattern - can be used instead of topics ## This will consume from all topics matching the pattern # topic_regex = "^telegraf-.*$" ## Consumer group name ## All consumers within a group will share the same offset consumer_group = "telegraf_consumer" ## Offset to start consuming from ## Available values: oldest, newest offset = "newest" ## Kafka version version = "4.1.0" ## Balancing strategy ## Options: range, roundrobin, sticky balance_strategy = "range" ## Maximum length of a message to consume, in bytes (default 0/unlimited) # max_message_len = 1000000 ## Data format to consume ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "json_v2" ## JSON v2 configuration ## Documentation: https://github.com/influxdata/telegraf/tree/master/plugins/parsers/json_v2 [[inputs.kafka_consumer.json_v2]] ## Measurement name configuration measurement_name = "root.test.d2" ## Timestamp configuration timestamp_path = "timestamp" timestamp_format = "unix_ms" # timestamp_timezone = "UTC" ## Object configuration - parse the entire JSON object [[inputs.kafka_consumer.json_v2.object]] path = "@this" ## Use the entire JSON object ## This will automatically detect and parse all fields from the JSON ## Numeric values become fields, strings can be configured as tags or fields excluded_keys = ["timestamp"] ## Exclude timestamp from being parsed as a field ############################################################################### # OUTPUT PLUGINS # ############################################################################### # Save metrics to an IoTDB Database [[outputs.iotdb]] ## Configuration of IoTDB server connection host = "localhost" port = "6667" ## Configuration of authentication user = "root" password = "root" ## Timeout to open a new session. ## A value of zero means no timeout. timeout = "5s" ## Configuration of type conversion for 64-bit unsigned int ## IoTDB currently DOES NOT support unsigned integers (version 13.x). ## 32-bit unsigned integers are safely converted into 64-bit signed integers by the plugin, ## however, this is not true for 64-bit values in general as overflows may occur. ## The following setting allows to specify the handling of 64-bit unsigned integers. ## Available values are: ## - "int64" -- convert to 64-bit signed integers and accept overflows ## - "int64_clip" -- convert to 64-bit signed integers and clip the values on overflow to 9,223,372,036,854,775,807 ## - "text" -- convert to the string representation of the value uint64_conversion = "int64_clip" ## Configuration of TimeStamp ## TimeStamp is always saved in 64bits int. timestamp_precision specifies the unit of timestamp. ## Available value: ## "second", "millisecond", "microsecond", "nanosecond"(default) timestamp_precision = "millisecond" ``` The issue with `telegraf` is that it nativelly doesn't support clustering and streaming like seatunnel ### Are you willing to submit a PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
