This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit fb5513807eec8658eaed12fdf4419ccc45d59c9c Author: Andrey Skorikov <andrey.skori...@codecentric.de> AuthorDate: Tue Sep 11 15:57:04 2018 +0200 added url to source key schema --- .../org/apache/plc4x/kafka/Plc4xSourceTask.java | 65 ++++++++++++++++------ .../org/apache/plc4x/java/test/TestDevice.java | 42 +++++++++++--- 2 files changed, 82 insertions(+), 25 deletions(-) diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java index c354a1e..7d0ed86 100644 --- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java +++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java @@ -20,6 +20,8 @@ package org.apache.plc4x.kafka; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -32,10 +34,7 @@ import org.apache.plc4x.java.api.messages.PlcReadResponse; import org.apache.plc4x.java.api.types.PlcResponseCode; import org.apache.plc4x.kafka.util.VersionUtil; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.*; /** @@ -48,6 +47,15 @@ public class Plc4xSourceTask extends SourceTask { private final static long WAIT_LIMIT_MILLIS = 100; private final static long TIMEOUT_LIMIT_MILLIS = 5000; + private final static String URL_FIELD = "url"; + private final static String QUERY_FIELD = "query"; + + private final static Schema KEY_SCHEMA = + new SchemaBuilder(Schema.Type.STRUCT) + .field(URL_FIELD, Schema.STRING_SCHEMA) + .field(QUERY_FIELD, Schema.STRING_SCHEMA) + .build(); + private String topic; private String url; private List<String> queries; @@ -56,6 +64,8 @@ public class Plc4xSourceTask extends SourceTask { private PlcReader plcReader; private PlcReadRequest plcRequest; + + // TODO: should we use shared (static) thread pool for this? private ScheduledExecutorService scheduler; private ScheduledFuture<?> timer; @@ -166,11 +176,16 @@ public class Plc4xSourceTask extends SourceTask { continue; } - Object rawValue = response.getObject(query); - Schema valueSchema = getSchema(rawValue.getClass()); - Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue; + Struct key = new Struct(KEY_SCHEMA) + .put(URL_FIELD, url) + .put(QUERY_FIELD, query); + + Object value = response.getObject(query); + Schema valueSchema = getSchema(value); Long timestamp = System.currentTimeMillis(); - Map<String, String> sourcePartition = Collections.singletonMap("url", url); + Map<String, String> sourcePartition = new HashMap<>(); + sourcePartition.put("url", url); + sourcePartition.put("query", query); Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp); SourceRecord record = @@ -178,8 +193,8 @@ public class Plc4xSourceTask extends SourceTask { sourcePartition, sourceOffset, topic, - Schema.STRING_SCHEMA, - query, + KEY_SCHEMA, + key, valueSchema, value ); @@ -190,20 +205,38 @@ public class Plc4xSourceTask extends SourceTask { return result; } - private Schema getSchema(Class<?> type) { - if (type.equals(Byte.class)) + private Schema getSchema(Object value) { + Objects.requireNonNull(value); + + if (value instanceof Byte) return Schema.INT8_SCHEMA; - if (type.equals(Short.class)) + if (value instanceof Short) return Schema.INT16_SCHEMA; - if (type.equals(Integer.class)) + if (value instanceof Integer) return Schema.INT32_SCHEMA; - if (type.equals(Long.class)) + if (value instanceof Long) return Schema.INT64_SCHEMA; - return Schema.STRING_SCHEMA; // default case; invoke .toString on value + if (value instanceof Float) + return Schema.FLOAT32_SCHEMA; + + if (value instanceof Double) + return Schema.FLOAT64_SCHEMA; + + if (value instanceof Boolean) + return Schema.BOOLEAN_SCHEMA; + + if (value instanceof String) + return Schema.STRING_SCHEMA; + + if (value instanceof byte[]) + return Schema.BYTES_SCHEMA; + + // TODO: add support for collective and complex types + throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName())); } } \ No newline at end of file diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java index 3966deb..3fcfc1d 100644 --- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java +++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java @@ -20,6 +20,7 @@ package org.apache.plc4x.java.test; import org.apache.plc4x.java.base.messages.items.FieldItem; +import java.lang.reflect.Array; import java.util.*; /** @@ -69,22 +70,45 @@ class TestDevice { private FieldItem randomValue(Class<?> type) { Object result = null; - // TODO: implement for further data types + if (type.equals(Byte.class)) + result = (byte) random.nextInt(1 << 8); - if (type == Integer.class) + if (type.equals(Short.class)) + result = (short) random.nextInt(1 << 16); + + if (type.equals(Integer.class)) result = random.nextInt(); - if (type == Byte.class) { - byte[] bytes = new byte[1]; - random.nextBytes(bytes); - result = bytes[0]; + if (type.equals(Long.class)) + result = random.nextLong(); + + if (type.equals(Float.class)) + result = random.nextFloat(); + + if (type.equals(Double.class)) + result = random.nextDouble(); + + if (type.equals(Boolean.class)) + result = random.nextBoolean(); + + if (type.equals(String.class)) { + int length = random.nextInt(100); + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + char c = (char)('a' + random.nextInt(26)); + sb.append(c); + } + result = sb.toString(); } - if (type == Short.class) { - result = random.nextInt(1 << 16); + if (type.equals(byte[].class)) { + int length = random.nextInt(100); + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + result = bytes; } - return new TestFieldItem(new Object[]{result}); + return new TestFieldItem(new Object[] { result }); } @Override