Hi y'all,

I have a question regarding your Kafka Source Connector. It seems as if the 
Connector does  not execute the jobs. Ill give you some code and thoughts and I 
hope you can tell me whether the mistake is on my side or that might be a bug.
So this is the current kafka connect config file:

{
  "config": {
    "name": "plc4x-source-sensors-raw-data",
    "connector.class": "org.apache.plc4x.kafka.Plc4xSourceConnector",
    "default.topic": "default_topic",
    "tasks.max": "1",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",

    "sources": "machine",
    "sources.machine.connectionString": 
"opcua:tcp://10.202.70.13:4840?discovery=false",
    "sources.machine.pollReturnInterval": "5000",
    "sources.machine.bufferSize": "1000",
    "sources.machine.jobReferences": "sensors",
    "sources.machine.jobReferences.sensors.topic": "test_plc_4x",

    "jobs": "sensors",
    "jobs.sensors.interval": "1000",
    "jobs.sensors.tags": "test1,test2,test3,test4",
    "jobs.sensors.tags.test1": "ns=2;s=cfl.FV.rSupCircCurr;REAL",
    "jobs.sensors.tags.test2": "ns=2;s=cfl.FV.rSupCircCounter;REAL",
    "jobs.sensors.tags.test3": "ns=2;s=wir.FV.rWRTemp;REAL",
    "jobs.sensors.tags.test4": "ns=2;s=wir.FV.rVerlegRePosition;REAL",

    "bootstrap.servers": "stargate-kafka:29092",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter.schemas.enable": "false",
    "offset.storage.file.filename":"/tmp/connect.offsets",
    "offset.flush.interval.ms":"10000",
    "enable.idempotence":"true",
    "acks":"all"
  },
  "name": "plc4x-source-sensors-raw-data"
}

The log in my Kafka connect displays this message here:
[2023-12-22 22:28:18,792] INFO 
WorkerSourceTask{id=plc4x-source-sensors-raw-data-0} Either no records were 
produced by the task since the last offset commit, or every record has been 
filtered out by a transformation or dropped due to transformation or conversion 
errors. (org.apache.kafka.connect.runtime.WorkerSourceTask)

It seems as if there's a job being executed but in fact it's not. This is the 
code from the task:
log.info("before the scraper");
            scraper = new TriggeredScraperImpl(scraperConfig, (jobName, 
sourceName, results) -> {
                try {
                    Long timestamp = System.currentTimeMillis();

                    Map<String, String> sourcePartition = new HashMap<>();
                    sourcePartition.put("sourceName", sourceName);
                    sourcePartition.put("jobName", jobName);

                    Map<String, Long> sourceOffset = 
Collections.singletonMap("offset", timestamp);

                    log.info("step 1");

                    String topic = topics.get(jobName);

                    // Prepare the key structure.
                    Struct key = new Struct(KEY_SCHEMA)
                            .put(Constants.SOURCE_NAME_FIELD, sourceName)
                            .put(Constants.JOB_NAME_FIELD, jobName);

                    // Build the Schema for the result struct.
                    SchemaBuilder tagSchemaBuilder = SchemaBuilder.struct()
                            .name("org.apache.plc4x.kafka.schema.Tag");

                    log.info("step 2");

                    for (Map.Entry<String, Object> result : results.entrySet()) 
{
                        // Get tag-name and -value from the results.
                        String tagName = result.getKey();
                        Object tagValue = result.getValue();

                        // Get the schema for the given value type.
                        Schema valueSchema = getSchema(tagValue);

                        // Add the schema description for the current tag.
                        tagSchemaBuilder.field(tagName, valueSchema);
                    }
                    Schema tagSchema = tagSchemaBuilder.build();

                    log.info("step 3");

                    Schema recordSchema = SchemaBuilder.struct()
                            .name("org.apache.plc4x.kafka.schema.JobResult")
                            .doc("PLC Job result. This contains all of the 
received PLCValues as well as a received timestamp")
                            .field(Constants.TAGS_CONFIG, tagSchema)
                            .field(Constants.TIMESTAMP_CONFIG, 
Schema.INT64_SCHEMA)
                            .field(Constants.EXPIRES_CONFIG, 
Schema.OPTIONAL_INT64_SCHEMA)
                            .build();

                    // Build the struct itself.
                    Struct tagStruct = new Struct(tagSchema);
                    for (Map.Entry<String, Object> result : results.entrySet()) 
{
                        // Get tag-name and -value from the results.
                        String tagName = result.getKey();
                        Object tagValue = result.getValue();

                        if (tagSchema.field(tagName).schema().type() == 
Schema.Type.ARRAY) {
                            tagStruct.put(tagName, ((List) 
tagValue).stream().map(p -> ((PlcValue) p).getObject())
                                    .collect(Collectors.toList()));
                        } else {
                            tagStruct.put(tagName, tagValue);
                        }
                    }

                    Struct recordStruct = new Struct(recordSchema)
                            .put(Constants.TAGS_CONFIG, tagStruct)
                            .put(Constants.TIMESTAMP_CONFIG, timestamp);

                    // Prepare the source-record element.
                    SourceRecord sourceRecord = new SourceRecord(
                            sourcePartition, sourceOffset,
                            topic,
                            KEY_SCHEMA, key,
                            recordSchema, recordStruct);
                    log.info("Thats the record:", sourceRecord);

                    // Add the new source-record to the buffer.
                    buffer.add(sourceRecord);
                } catch (Exception e) {
                    log.error("Error while parsing returned values", e);
                }
            }, triggerCollector);
            log.info("after the scraper");

I left some logging statements as you can see and only the statements before 
and after scraper are being executed and the steps in between when the scraper 
is started are not being displayed, so my assumption is that the ode which 
produces the actual message is not being executed or am I wrong here?

For making sure it is not related to the plc and the tags, I used the read plc 
as you can see here:
java -jar .\target\plc4j-examples-hello-world-plc4x-read-0.11.0-uber-jar.jar 
--connection-string "opcua:tcp://10.202.70.13:4840?discovery=false" 
--tag-addresses "ns=2;s=cfl.FV.rSupCircCurr;REAL" 
"ns=2;s=cfl.FV.rSupCircCounter;REAL" "ns=2;s=wir.FV.rWRTemp;REAL" 
"ns=2;s=wir.FV.rVerlegRePosition;REAL"
23:40:50.121 [main] INFO  o.a.p.j.e.h.read.HelloPlc4xRead - Synchronous request 
...
23:40:50.170 [main] INFO  o.a.p.j.e.h.read.HelloPlc4xRead - 
Value[value-ns=2;s=cfl.FV.rSupCircCurr;REAL]: 0.046645667
23:40:50.171 [main] INFO  o.a.p.j.e.h.read.HelloPlc4xRead - 
Value[value-ns=2;s=cfl.FV.rSupCircCounter;REAL]: 1911.0
23:40:50.171 [main] INFO  o.a.p.j.e.h.read.HelloPlc4xRead - 
Value[value-ns=2;s=wir.FV.rWRTemp;REAL]: 23.61865
23:40:50.171 [main] INFO  o.a.p.j.e.h.read.HelloPlc4xRead - 
Value[value-ns=2;s=wir.FV.rVerlegRePosition;REAL]: 235.443

and from my understanding of how the source connector works, it should be the 
same because you stated you wanna implement subscribe functionality next year. 
So it should use the same continuous read functionality, right?

I hope that you can help me out. Thanks in advance for reading the mail and I 
hope you find the time to send me an answer.

Best regards

Peter



Die vorangehende E-Mail-Nachricht, einschlie?lich aller Anlagen, kann 
vertrauliche Informationen beinhalten und ist nur f?r den oder die vorgesehenen 
Empf?nger bestimmt. Unberechtigter Zugriff, Nachbearbeitung, Verwendung, 
Freigabe, Verteilung oder Vervielf?ltigung der Inhalte dieser E-Mail ist 
strengstens untersagt. Wenn Sie nicht der beabsichtigte Empf?nger sind, 
kontaktieren Sie bitte sofort den Absender und l?schen Sie alle Kopien.


The preceding e-mail message, including any attachments, may contain 
confidential information and is for the intended recipient(s) only. 
Unauthorized access, review, use, sharing, distribution or reproduction of the 
content of this e-mail is strictly prohibited. If you are not the intended 
recipient, please contact the sender immediately and delete all copies.


The Mail was scanned by LAPMASTER WOLTERS Security System

Reply via email to