amanchaudhary-95 opened a new issue, #9992:
URL: https://github.com/apache/seatunnel/issues/9992

   ### Search before asking
   
   - [x] I had searched in the 
[feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
   I'm trying to stream data from a Kafka consumer to IoTDB. The data is IoT 
sensor data, and below is the message from the Kafka broker that I'm streaming 
from Kafka to IoTDB:
   
   
```{"mass_flow":2.208682858804355,"density":41.27306775668481,"volume_flow":2.7837518158137557,"sensor_fault_flags":true,"temperature":84.57554188821447,"timestamp":1761550505272,"sensorId":"flowrate02"}```
   
   I've below configuration that is working:
   ```
   env {
     parallelism = 2
     job.mode = "STREAMING"
     checkpoint.interval = 2000
   }
   
   source{
     Kafka {
       # Kafka bootstrap servers
       bootstrap.servers = "localhost:9094"
   
       # Topic
       topic = "sp_iotdb_buffer"
   
       # Consumer group
       consumer.group.id = "seatunnel_iotdb_group"
   
       # Start position: earliest | latest
       start_mode = "earliest"
   
       # Message format - json without explicit schema
       format = "json"
   
       schema = {
         fields = {
           mass_flow = "double"
           density = "double"
           volume_flow = "double"
           sensor_fault_flags = "boolean"
           temperature = "double"
           timestamp = "bigint"
           sensorId = "string"
         }
       }
   
       # Output table name
       plugin_output = "kafka"
     }
   }
   
   sink {
     IoTDB {
       # Read directly from Kafka source table
       plugin_input = "kafka"
   
       # IoTDB connection and mapping
       node_urls = ["localhost:6667"]
       username = "root"
       password = "root"
       key_device = "device_id" ##value of sensorId will be used as device id
       key_timestamp = "timestamp"    # event time column (ms epoch)
       timestamp_precision = "ms"
   
       # Device/time mapping
       storage_group = "root.test" # will become root.test.<sensorId>
       
       # Optional tuning
       aligned = false
       batch_size = 1024
       retry = 3
     }
   }
   ```
   
   The above config works fine and sends data to IoTDB. The issue is that there 
are thousands of sensors, and it may not be possible to provide the `schema` 
for all the sensors. Although `schema` is an optional parameter in the Kafka 
source connector but if I remove it, JSON cannot auto-decode it and treats it 
as a raw string. Also, IoTDB expects  `key_device` and `timestamp` as inputs. 
   
   The Kafka source connector should be able to auto-detect the schema from the 
messages. As of now, even if I put
   ```
       schema = {
         fields = {}
       }
   ```
   It will remove all the fields from the message. The `scheme` parameter 
should be made truly optional in the sense that it should be used only to 
define those fields that JSON can't auto-detect or detects incorrectly.
   
   PS: I tried other formats such as `COMPATIBLE_KAFKA_CONNECT_JSON`, 
`debezium_json`, `canal_json` and `avro`. None of it works.
   
   ### Usage Scenario
   
   It will make IoT sensor data streaming more accessible using Kafka, as the 
`seatunnel` doesn't have a MQTT connector for industrial IoT data streaming.
   
   ### Related issues
   
   **I'm not sure, but this may be useful.** I used 
[`telegraf`](https://github.com/influxdata/telegraf/tree/v1.36.3) and I was 
able to send data from Kafka to IoTDB. `Telegraf` was able to auto parse the 
JSON schema with its `json_v2`. It uses `GJSON`. This is the config I used in 
`telegraf`:
   
   ```
   # Telegraf Configuration for Kafka to IoTDB Pipeline
   # This configuration reads data from Kafka broker and writes to IoTDB
   
   # Global tags can be specified here in key="value" format.
   
   # Configuration for telegraf agent
   [agent]
     ## Default data collection interval for all inputs
     interval = "1s"
     
     ## Rounds collection interval to 'interval'
     round_interval = true
   
     ## Telegraf will send metrics to outputs in batches of at most
     ## metric_batch_size metrics. For streaming, use small batch size
     metric_batch_size = 10
   
     ## Maximum number of unwritten metrics per output.
     ## Keep this low for streaming to avoid buffering
     metric_buffer_limit = 1000
   
     ## Collection jitter is used to jitter the collection by a random amount.
     collection_jitter = "0s"
   
     ## Default flushing interval for all outputs.
     ## For streaming, flush immediately or very frequently
     flush_interval = "1s"
   
     ## Override default hostname, if empty use os.Hostname()
     hostname = ""
     
     ## If set to true, do no set the "host" tag in the telegraf agent.
     omit_hostname = true
   
     ## Enable debug mode for detailed logging
     debug = true
   
     ## Log only error level messages
     quiet = true
   
   
   
###############################################################################
   #                            INPUT PLUGINS                                   
  #
   
###############################################################################
   
   # Read metrics from Kafka topics
   [[inputs.kafka_consumer]]
     ## Kafka brokers - specify your Kafka broker addresses
     brokers = ["localhost:9094"]
   
     ## Topics to consume - add all topics you want to consume from
     topics = ["sp_iotdb_buffer"]
     
     ## Optional topic pattern - can be used instead of topics
     ## This will consume from all topics matching the pattern
     # topic_regex = "^telegraf-.*$"
   
     ## Consumer group name
     ## All consumers within a group will share the same offset
     consumer_group = "telegraf_consumer"
   
     ## Offset to start consuming from
     ## Available values: oldest, newest
     offset = "newest"
   
     ## Kafka version
     version = "4.1.0"
   
     ## Balancing strategy
     ## Options: range, roundrobin, sticky
     balance_strategy = "range"
   
     ## Maximum length of a message to consume, in bytes (default 0/unlimited)
     # max_message_len = 1000000
   
     ## Data format to consume
     ## Each data format has its own unique set of configuration options, read
     ## more about them here:
     ## 
https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
     data_format = "json_v2"
   
     ## JSON v2 configuration
     ## Documentation: 
https://github.com/influxdata/telegraf/tree/master/plugins/parsers/json_v2
     [[inputs.kafka_consumer.json_v2]]
       ## Measurement name configuration
       measurement_name = "root.test.d2"
       
       ## Timestamp configuration
       timestamp_path = "timestamp"
       timestamp_format = "unix_ms"
       # timestamp_timezone = "UTC"
       
       ## Object configuration - parse the entire JSON object
       [[inputs.kafka_consumer.json_v2.object]]
         path = "@this"  ## Use the entire JSON object
         ## This will automatically detect and parse all fields from the JSON
         ## Numeric values become fields, strings can be configured as tags or 
fields
         excluded_keys = ["timestamp"]  ## Exclude timestamp from being parsed 
as a field
   
   
   
###############################################################################
   #                            OUTPUT PLUGINS                                  
  #
   
###############################################################################
   
   # Save metrics to an IoTDB Database
   [[outputs.iotdb]]
     ## Configuration of IoTDB server connection
     host = "localhost"
     port = "6667"
   
     ## Configuration of authentication
     user = "root"
     password = "root"
   
     ## Timeout to open a new session.
     ## A value of zero means no timeout.
     timeout = "5s"
   
     ## Configuration of type conversion for 64-bit unsigned int
     ## IoTDB currently DOES NOT support unsigned integers (version 13.x).
     ## 32-bit unsigned integers are safely converted into 64-bit signed 
integers by the plugin,
     ## however, this is not true for 64-bit values in general as overflows may 
occur.
     ## The following setting allows to specify the handling of 64-bit unsigned 
integers.
     ## Available values are:
     ##   - "int64"       --  convert to 64-bit signed integers and accept 
overflows
     ##   - "int64_clip"  --  convert to 64-bit signed integers and clip the 
values on overflow to 9,223,372,036,854,775,807
     ##   - "text"        --  convert to the string representation of the value
     uint64_conversion = "int64_clip"
   
     ## Configuration of TimeStamp
     ## TimeStamp is always saved in 64bits int. timestamp_precision specifies 
the unit of timestamp.
     ## Available value:
     ## "second", "millisecond", "microsecond", "nanosecond"(default)
     timestamp_precision = "millisecond"
   ```
   
   The issue with `telegraf` is that it nativelly doesn't support clustering 
and streaming like seatunnel
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to