This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 48dbad00b18c59c70bbe5e94160ec7a1f04b4c6a Author: Andrey Skorikov <andrey.skori...@codecentric.de> AuthorDate: Fri Oct 12 13:22:02 2018 +0200 allow multiple connections in kafka connector --- integrations/apache-kafka/config/source.properties | 3 +- .../apache/plc4x/kafka/Plc4xSourceConnector.java | 45 +++++++++++----------- .../org/apache/plc4x/kafka/Plc4xSourceTask.java | 30 ++++++++++++--- 3 files changed, 48 insertions(+), 30 deletions(-) diff --git a/integrations/apache-kafka/config/source.properties b/integrations/apache-kafka/config/source.properties index cbd00f5..afa7e93 100644 --- a/integrations/apache-kafka/config/source.properties +++ b/integrations/apache-kafka/config/source.properties @@ -19,6 +19,5 @@ limitations under the License. name=plc-source-test connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector topic=test -url=test:unused -queries=RANDOM/foo:INTEGER,RANDOM/bar:STRING +queries=test:unused#RANDOM/foo:INTEGER,test:another#RANDOM/bar:STRING rate=2000 \ No newline at end of file diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java index 4d014a5..bb1392e 100644 --- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java +++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java @@ -22,36 +22,28 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; -import org.apache.kafka.connect.util.ConnectorUtils; import org.apache.plc4x.kafka.util.VersionUtil; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.stream.Collectors; public class Plc4xSourceConnector extends SourceConnector { - static final String TOPIC_CONFIG = "topic"; + private static final String TOPIC_CONFIG = "topic"; private static final String TOPIC_DOC = "Kafka topic to publish to"; - static final String URL_CONFIG = "url"; - private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC"; - - static final String QUERIES_CONFIG = "queries"; + private static final String QUERIES_CONFIG = "queries"; private static final String QUERIES_DOC = "Field queries to be sent to the PLC"; - static final String RATE_CONFIG = "rate"; + private static final String RATE_CONFIG = "rate"; private static final Integer RATE_DEFAULT = 1000; private static final String RATE_DOC = "Polling rate"; - static final ConfigDef CONFIG_DEF = new ConfigDef() + private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC) - .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC) .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC) .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC); private String topic; - private String url; private List<String> queries; private Integer rate; @@ -63,23 +55,30 @@ public class Plc4xSourceConnector extends SourceConnector { @Override public List<Map<String, String>> taskConfigs(int maxTasks) { List<Map<String, String>> configs = new LinkedList<>(); - List<List<String>> queryGroups = ConnectorUtils.groupPartitions(queries, maxTasks); - for (List<String> queryGroup: queryGroups) { + Map<String, List<String>> groupedByHost = new HashMap<>(); + queries.stream().map(query -> query.split("#", 2)).collect(Collectors.groupingBy(parts -> parts[0])).forEach((host, queries) -> { + groupedByHost.put(host, queries.stream().map(parts -> parts[1]).collect(Collectors.toList())); + }); + if (groupedByHost.size() > maxTasks) { + // Not enough tasks + // TODO: throw exception? + return Collections.emptyList(); + } + groupedByHost.forEach((host, qs) -> { Map<String, String> taskConfig = new HashMap<>(); - taskConfig.put(TOPIC_CONFIG, topic); - taskConfig.put(URL_CONFIG, url); - taskConfig.put(QUERIES_CONFIG, String.join(",", queryGroup)); - taskConfig.put(RATE_CONFIG, rate.toString()); + taskConfig.put(Plc4xSourceTask.TOPIC_CONFIG, topic); + taskConfig.put(Plc4xSourceTask.URL_CONFIG, host); + taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, String.join(",", qs)); + taskConfig.put(Plc4xSourceTask.RATE_CONFIG, rate.toString()); configs.add(taskConfig); - } + }); return configs; } @Override public void start(Map<String, String> props) { - AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props); + AbstractConfig config = new AbstractConfig(CONFIG_DEF, props); topic = config.getString(TOPIC_CONFIG); - url = config.getString(URL_CONFIG); queries = config.getList(QUERIES_CONFIG); rate = config.getInt(RATE_CONFIG); } diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java index 2bbc56b..f172a38 100644 --- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java +++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java @@ -19,6 +19,7 @@ under the License. package org.apache.plc4x.kafka; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -43,6 +44,25 @@ import java.util.concurrent.*; * If the flag does not become true, the method returns null, otherwise a fetch is performed. */ public class Plc4xSourceTask extends SourceTask { + static final String TOPIC_CONFIG = "topic"; + private static final String TOPIC_DOC = "Kafka topic to publish to"; + + static final String URL_CONFIG = "url"; + private static final String URL_DOC = "PLC URL"; + + static final String QUERIES_CONFIG = "queries"; + private static final String QUERIES_DOC = "Field queries to be sent to the PLC"; + + static final String RATE_CONFIG = "rate"; + private static final Integer RATE_DEFAULT = 1000; + private static final String RATE_DOC = "Polling rate"; + + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC) + .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC) + .define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC) + .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC); + private final static long WAIT_LIMIT_MILLIS = 100; private final static long TIMEOUT_LIMIT_MILLIS = 5000; @@ -72,10 +92,10 @@ public class Plc4xSourceTask extends SourceTask { @Override public void start(Map<String, String> props) { - AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props); - topic = config.getString(Plc4xSourceConnector.TOPIC_CONFIG); - url = config.getString(Plc4xSourceConnector.URL_CONFIG); - queries = config.getList(Plc4xSourceConnector.QUERIES_CONFIG); + AbstractConfig config = new AbstractConfig(CONFIG_DEF, props); + topic = config.getString(TOPIC_CONFIG); + url = config.getString(URL_CONFIG); + queries = config.getList(QUERIES_CONFIG); openConnection(); @@ -83,7 +103,7 @@ public class Plc4xSourceTask extends SourceTask { throw new ConnectException("Reading not supported on this connection"); } - int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG)); + int rate = Integer.valueOf(props.get(RATE_CONFIG)); scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(Plc4xSourceTask.this::scheduleFetch, rate, rate, TimeUnit.MILLISECONDS); }