This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch featule/kafka-connect-refactoring in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 74c892cda34d2ac9d1f1b7eb94b2d55bfba60c6e Author: Christofer Dutz <[email protected]> AuthorDate: Sat Aug 24 19:36:58 2019 +0200 - Finished a first fully operational version of the Kafka Connect Source --- .../org/apache/plc4x/kafka/Plc4xSourceTask.java | 175 ++++++++++++--------- 1 file changed, 101 insertions(+), 74 deletions(-) diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java index 87ede4e..81b0d62 100644 --- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java +++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java @@ -27,7 +27,6 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.plc4x.java.PlcDriverManager; -import org.apache.plc4x.java.scraper.ResultHandler; import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder; import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl; import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder; @@ -40,6 +39,10 @@ import org.apache.plc4x.kafka.util.VersionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.*; import java.util.concurrent.*; @@ -76,21 +79,13 @@ public class Plc4xSourceTask extends SourceTask { */ private static final String SOURCE_NAME_FIELD = "source-name"; private static final String JOB_NAME_FIELD = "job-name"; - private static final String FIELD_NAME_FIELD = "field-name"; private static final Schema KEY_SCHEMA = new SchemaBuilder(Schema.Type.STRUCT) .field(SOURCE_NAME_FIELD, Schema.STRING_SCHEMA) .field(JOB_NAME_FIELD, Schema.STRING_SCHEMA) - .field(FIELD_NAME_FIELD, Schema.STRING_SCHEMA) .build(); - // Internal properties. - private Map<String, String> topics; - private PlcDriverManager plcDriverManager; - private TriggerCollector triggerCollector; - private TriggeredScraperImpl scraper; - // Internal buffer into which all incoming scraper responses are written to. private ArrayBlockingQueue<SourceRecord> buffer; @@ -104,7 +99,7 @@ public class Plc4xSourceTask extends SourceTask { AbstractConfig config = new AbstractConfig(CONFIG_DEF, props); String connectionName = config.getString(CONNECTION_NAME_CONFIG); String plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG); - topics = new HashMap<>(); + Map<String, String> topics = new HashMap<>(); // Create a buffer with a capacity of 1000 elements which schedules access in a fair way. buffer = new ArrayBlockingQueue<>(1000, true); @@ -145,51 +140,58 @@ public class Plc4xSourceTask extends SourceTask { ScraperConfigurationTriggeredImpl scraperConfig = builder.build(); try { - plcDriverManager = new PooledPlcDriverManager(); - triggerCollector = new TriggerCollectorImpl(plcDriverManager); - scraper = new TriggeredScraperImpl(scraperConfig, new ResultHandler() { - @Override - public void handle(String jobName, String sourceName, Map<String, Object> results) { - Long timestamp = System.currentTimeMillis(); - - Map<String, String> sourcePartition = new HashMap<>(); - sourcePartition.put("sourceName", sourceName); - sourcePartition.put("jobName", jobName); - - Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp); - - String topic = topics.get(jobName); - - for (Map.Entry<String, Object> result : results.entrySet()) { - // Get field-name and -value from the results. - String fieldName = result.getKey(); - Object fieldValue = result.getValue(); - - // Prepare the key structure. - Struct key = new Struct(KEY_SCHEMA) - .put(SOURCE_NAME_FIELD, sourceName) - .put(JOB_NAME_FIELD, jobName) - .put(FIELD_NAME_FIELD, fieldName); - - // Get the schema for the given value type. - Schema valueSchema = getSchema(fieldValue); - - // Prepare the source-record element. - SourceRecord record = - new SourceRecord( - sourcePartition, - sourceOffset, - topic, - KEY_SCHEMA, - key, - valueSchema, - fieldValue - ); - - // Add the new source-record to the buffer. - buffer.add(record); - } + PlcDriverManager plcDriverManager = new PooledPlcDriverManager(); + TriggerCollector triggerCollector = new TriggerCollectorImpl(plcDriverManager); + TriggeredScraperImpl scraper = new TriggeredScraperImpl(scraperConfig, (jobName, sourceName, results) -> { + Long timestamp = System.currentTimeMillis(); + + Map<String, String> sourcePartition = new HashMap<>(); + sourcePartition.put("sourceName", sourceName); + sourcePartition.put("jobName", jobName); + + Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp); + + String topic = topics.get(jobName); + + // Prepare the key structure. + Struct key = new Struct(KEY_SCHEMA) + .put(SOURCE_NAME_FIELD, sourceName) + .put(JOB_NAME_FIELD, jobName); + + // Build the Schema for the result struct. + SchemaBuilder recordSchemaBuilder = SchemaBuilder.struct().name("org.apache.plc4x.kafka.JobResult"); + for (Map.Entry<String, Object> result : results.entrySet()) { + // Get field-name and -value from the results. + String fieldName = result.getKey(); + Object fieldValue = result.getValue(); + + // Get the schema for the given value type. + Schema valueSchema = getSchema(fieldValue); + + // Add the schema description for the current field. + recordSchemaBuilder.field(fieldName, valueSchema); + } + Schema recordSchema = recordSchemaBuilder.build(); + + // Build the struct itself. + Struct recordStruct = new Struct(recordSchema); + for (Map.Entry<String, Object> result : results.entrySet()) { + // Get field-name and -value from the results. + String fieldName = result.getKey(); + Object fieldValue = result.getValue(); + recordStruct.put(fieldName, fieldValue); } + + // Prepare the source-record element. + SourceRecord record = new SourceRecord( + sourcePartition, sourceOffset, + topic, + KEY_SCHEMA, key, + recordSchema, recordStruct + ); + + // Add the new source-record to the buffer. + buffer.add(record); }, triggerCollector); scraper.start(); triggerCollector.start(); @@ -221,33 +223,58 @@ public class Plc4xSourceTask extends SourceTask { private Schema getSchema(Object value) { Objects.requireNonNull(value); - if (value instanceof Byte) - return Schema.INT8_SCHEMA; - - if (value instanceof Short) - return Schema.INT16_SCHEMA; - - if (value instanceof Integer) - return Schema.INT32_SCHEMA; + if(value instanceof List) { + List list = (List) value; + if(list.isEmpty()) { + throw new ConnectException("Unsupported empty lists."); + } + // In PLC4X list elements all contain the same type. + Object firstElement = list.get(0); + Schema elementSchema = getSchema(firstElement); + return SchemaBuilder.array(elementSchema).build(); + } + if (value instanceof BigDecimal) { - if (value instanceof Long) - return Schema.INT64_SCHEMA; + } + if (value instanceof BigDecimal) { - if (value instanceof Float) + } + if (value instanceof Boolean) { + return Schema.BOOLEAN_SCHEMA; + } + if (value instanceof byte[]) { + return Schema.BYTES_SCHEMA; + } + if (value instanceof Byte) { + return Schema.INT8_SCHEMA; + } + if (value instanceof Double) { + return Schema.FLOAT64_SCHEMA; + } + if (value instanceof Float) { return Schema.FLOAT32_SCHEMA; + } + if (value instanceof Integer) { + return Schema.INT32_SCHEMA; + } + if (value instanceof LocalDate) { - if (value instanceof Double) - return Schema.FLOAT64_SCHEMA; + } + if (value instanceof LocalDateTime) { - if (value instanceof Boolean) - return Schema.BOOLEAN_SCHEMA; + } + if (value instanceof LocalTime) { - if (value instanceof String) + } + if (value instanceof Long) { + return Schema.INT64_SCHEMA; + } + if (value instanceof Short) { + return Schema.INT16_SCHEMA; + } + if (value instanceof String) { return Schema.STRING_SCHEMA; - - if (value instanceof byte[]) - return Schema.BYTES_SCHEMA; - + } // TODO: add support for collective and complex types throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName())); }
