Hi y'all, I have a question regarding your Kafka Source Connector. It seems as if the Connector does not execute the jobs. Ill give you some code and thoughts and I hope you can tell me whether the mistake is on my side or that might be a bug. So this is the current kafka connect config file:
{ "config": { "name": "plc4x-source-sensors-raw-data", "connector.class": "org.apache.plc4x.kafka.Plc4xSourceConnector", "default.topic": "default_topic", "tasks.max": "1", "errors.log.enable": "true", "errors.log.include.messages": "true", "sources": "machine", "sources.machine.connectionString": "opcua:tcp://10.202.70.13:4840?discovery=false", "sources.machine.pollReturnInterval": "5000", "sources.machine.bufferSize": "1000", "sources.machine.jobReferences": "sensors", "sources.machine.jobReferences.sensors.topic": "test_plc_4x", "jobs": "sensors", "jobs.sensors.interval": "1000", "jobs.sensors.tags": "test1,test2,test3,test4", "jobs.sensors.tags.test1": "ns=2;s=cfl.FV.rSupCircCurr;REAL", "jobs.sensors.tags.test2": "ns=2;s=cfl.FV.rSupCircCounter;REAL", "jobs.sensors.tags.test3": "ns=2;s=wir.FV.rWRTemp;REAL", "jobs.sensors.tags.test4": "ns=2;s=wir.FV.rVerlegRePosition;REAL", "bootstrap.servers": "stargate-kafka:29092", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "key.converter.schemas.enable": "false", "offset.storage.file.filename":"/tmp/connect.offsets", "offset.flush.interval.ms":"10000", "enable.idempotence":"true", "acks":"all" }, "name": "plc4x-source-sensors-raw-data" } The log in my Kafka connect displays this message here: [2023-12-22 22:28:18,792] INFO WorkerSourceTask{id=plc4x-source-sensors-raw-data-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask) It seems as if there's a job being executed but in fact it's not. This is the code from the task: log.info("before the scraper"); scraper = new TriggeredScraperImpl(scraperConfig, (jobName, sourceName, results) -> { try { Long timestamp = System.currentTimeMillis(); Map<String, String> sourcePartition = new HashMap<>(); sourcePartition.put("sourceName", sourceName); sourcePartition.put("jobName", jobName); Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp); log.info("step 1"); String topic = topics.get(jobName); // Prepare the key structure. Struct key = new Struct(KEY_SCHEMA) .put(Constants.SOURCE_NAME_FIELD, sourceName) .put(Constants.JOB_NAME_FIELD, jobName); // Build the Schema for the result struct. SchemaBuilder tagSchemaBuilder = SchemaBuilder.struct() .name("org.apache.plc4x.kafka.schema.Tag"); log.info("step 2"); for (Map.Entry<String, Object> result : results.entrySet()) { // Get tag-name and -value from the results. String tagName = result.getKey(); Object tagValue = result.getValue(); // Get the schema for the given value type. Schema valueSchema = getSchema(tagValue); // Add the schema description for the current tag. tagSchemaBuilder.field(tagName, valueSchema); } Schema tagSchema = tagSchemaBuilder.build(); log.info("step 3"); Schema recordSchema = SchemaBuilder.struct() .name("org.apache.plc4x.kafka.schema.JobResult") .doc("PLC Job result. This contains all of the received PLCValues as well as a received timestamp") .field(Constants.TAGS_CONFIG, tagSchema) .field(Constants.TIMESTAMP_CONFIG, Schema.INT64_SCHEMA) .field(Constants.EXPIRES_CONFIG, Schema.OPTIONAL_INT64_SCHEMA) .build(); // Build the struct itself. Struct tagStruct = new Struct(tagSchema); for (Map.Entry<String, Object> result : results.entrySet()) { // Get tag-name and -value from the results. String tagName = result.getKey(); Object tagValue = result.getValue(); if (tagSchema.field(tagName).schema().type() == Schema.Type.ARRAY) { tagStruct.put(tagName, ((List) tagValue).stream().map(p -> ((PlcValue) p).getObject()) .collect(Collectors.toList())); } else { tagStruct.put(tagName, tagValue); } } Struct recordStruct = new Struct(recordSchema) .put(Constants.TAGS_CONFIG, tagStruct) .put(Constants.TIMESTAMP_CONFIG, timestamp); // Prepare the source-record element. SourceRecord sourceRecord = new SourceRecord( sourcePartition, sourceOffset, topic, KEY_SCHEMA, key, recordSchema, recordStruct); log.info("Thats the record:", sourceRecord); // Add the new source-record to the buffer. buffer.add(sourceRecord); } catch (Exception e) { log.error("Error while parsing returned values", e); } }, triggerCollector); log.info("after the scraper"); I left some logging statements as you can see and only the statements before and after scraper are being executed and the steps in between when the scraper is started are not being displayed, so my assumption is that the ode which produces the actual message is not being executed or am I wrong here? For making sure it is not related to the plc and the tags, I used the read plc as you can see here: java -jar .\target\plc4j-examples-hello-world-plc4x-read-0.11.0-uber-jar.jar --connection-string "opcua:tcp://10.202.70.13:4840?discovery=false" --tag-addresses "ns=2;s=cfl.FV.rSupCircCurr;REAL" "ns=2;s=cfl.FV.rSupCircCounter;REAL" "ns=2;s=wir.FV.rWRTemp;REAL" "ns=2;s=wir.FV.rVerlegRePosition;REAL" 23:40:50.121 [main] INFO o.a.p.j.e.h.read.HelloPlc4xRead - Synchronous request ... 23:40:50.170 [main] INFO o.a.p.j.e.h.read.HelloPlc4xRead - Value[value-ns=2;s=cfl.FV.rSupCircCurr;REAL]: 0.046645667 23:40:50.171 [main] INFO o.a.p.j.e.h.read.HelloPlc4xRead - Value[value-ns=2;s=cfl.FV.rSupCircCounter;REAL]: 1911.0 23:40:50.171 [main] INFO o.a.p.j.e.h.read.HelloPlc4xRead - Value[value-ns=2;s=wir.FV.rWRTemp;REAL]: 23.61865 23:40:50.171 [main] INFO o.a.p.j.e.h.read.HelloPlc4xRead - Value[value-ns=2;s=wir.FV.rVerlegRePosition;REAL]: 235.443 and from my understanding of how the source connector works, it should be the same because you stated you wanna implement subscribe functionality next year. So it should use the same continuous read functionality, right? I hope that you can help me out. Thanks in advance for reading the mail and I hope you find the time to send me an answer. Best regards Peter Die vorangehende E-Mail-Nachricht, einschlie?lich aller Anlagen, kann vertrauliche Informationen beinhalten und ist nur f?r den oder die vorgesehenen Empf?nger bestimmt. Unberechtigter Zugriff, Nachbearbeitung, Verwendung, Freigabe, Verteilung oder Vervielf?ltigung der Inhalte dieser E-Mail ist strengstens untersagt. Wenn Sie nicht der beabsichtigte Empf?nger sind, kontaktieren Sie bitte sofort den Absender und l?schen Sie alle Kopien. The preceding e-mail message, including any attachments, may contain confidential information and is for the intended recipient(s) only. Unauthorized access, review, use, sharing, distribution or reproduction of the content of this e-mail is strictly prohibited. If you are not the intended recipient, please contact the sender immediately and delete all copies. The Mail was scanned by LAPMASTER WOLTERS Security System