asfgit closed pull request #17: Add support for multiple queries in kafka
source connector
URL: https://github.com/apache/incubator-plc4x/pull/17
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 45ae926eb..189920886 100644
---
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
*/
package org.apache.plc4x.kafka;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
@@ -35,7 +36,7 @@ Licensed to the Apache Software Foundation (ASF) under one
static final String QUERY_CONFIG = "query";
private static final String QUERY_DOC = "Field query to be sent to the
PLC";
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
URL_DOC)
.define(QUERY_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, QUERY_DOC);
@@ -59,8 +60,9 @@ Licensed to the Apache Software Foundation (ASF) under one
@Override
public void start(Map<String, String> props) {
- url = props.get(URL_CONFIG);
- query = props.get(QUERY_CONFIG);
+ AbstractConfig config = new
AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+ url = config.getString(URL_CONFIG);
+ query = config.getString(QUERY_CONFIG);
}
@Override
diff --git
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index b29418f18..a54d5b08b 100644
---
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
*/
package org.apache.plc4x.kafka;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
@@ -33,8 +34,6 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.util.concurrent.ExecutionException;
public class Plc4xSinkTask extends SinkTask {
- private final static String FIELD_KEY = "key"; // TODO: is this really
necessary?
-
private String url;
private String query;
@@ -48,8 +47,9 @@ public String version() {
@Override
public void start(Map<String, String> props) {
- url = props.get(Plc4xSinkConnector.URL_CONFIG);
- query = props.get(Plc4xSinkConnector.QUERY_CONFIG);
+ AbstractConfig config = new
AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+ url = config.getString(Plc4xSinkConnector.URL_CONFIG);
+ query = config.getString(Plc4xSinkConnector.QUERY_CONFIG);
openConnection();
@@ -66,7 +66,7 @@ public void stop() {
public void put(Collection<SinkRecord> records) {
for (SinkRecord record: records) {
String value = record.value().toString(); // TODO: implement other
data types
- PlcWriteRequest plcRequest =
plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
+ PlcWriteRequest plcRequest =
plcWriter.writeRequestBuilder().addItem(query, query, value).build();
doWrite(plcRequest);
}
}
diff --git
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4d1d9d026..4d014a535 100644
---
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -18,13 +18,15 @@ Licensed to the Apache Software Foundation (ASF) under one
*/
package org.apache.plc4x.kafka;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
import org.apache.plc4x.kafka.util.VersionUtil;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -35,22 +37,22 @@ Licensed to the Apache Software Foundation (ASF) under one
static final String URL_CONFIG = "url";
private static final String URL_DOC = "Connection string used by PLC4X to
connect to the PLC";
- static final String QUERY_CONFIG = "query";
- private static final String QUERY_DOC = "Field query to be sent to the
PLC";
+ static final String QUERIES_CONFIG = "queries";
+ private static final String QUERIES_DOC = "Field queries to be sent to the
PLC";
static final String RATE_CONFIG = "rate";
private static final Integer RATE_DEFAULT = 1000;
private static final String RATE_DOC = "Polling rate";
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TOPIC_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
URL_DOC)
- .define(QUERY_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, QUERY_DOC)
+ .define(QUERIES_CONFIG, ConfigDef.Type.LIST,
ConfigDef.Importance.HIGH, QUERIES_DOC)
.define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT,
ConfigDef.Importance.MEDIUM, RATE_DOC);
private String topic;
private String url;
- private String query;
+ private List<String> queries;
private Integer rate;
@Override
@@ -60,22 +62,26 @@ Licensed to the Apache Software Foundation (ASF) under one
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
- Map<String, String> taskConfig = new HashMap<>();
- taskConfig.put(TOPIC_CONFIG, topic);
- taskConfig.put(URL_CONFIG, url);
- taskConfig.put(QUERY_CONFIG, query);
- taskConfig.put(RATE_CONFIG, rate.toString());
-
- // Only one task will be created; ignoring maxTasks for now
- return Collections.singletonList(taskConfig);
+ List<Map<String, String>> configs = new LinkedList<>();
+ List<List<String>> queryGroups =
ConnectorUtils.groupPartitions(queries, maxTasks);
+ for (List<String> queryGroup: queryGroups) {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(TOPIC_CONFIG, topic);
+ taskConfig.put(URL_CONFIG, url);
+ taskConfig.put(QUERIES_CONFIG, String.join(",", queryGroup));
+ taskConfig.put(RATE_CONFIG, rate.toString());
+ configs.add(taskConfig);
+ }
+ return configs;
}
@Override
public void start(Map<String, String> props) {
- topic = props.get(TOPIC_CONFIG);
- url = props.get(URL_CONFIG);
- query = props.get(QUERY_CONFIG);
- rate = Integer.valueOf(props.get(RATE_CONFIG));
+ AbstractConfig config = new
AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+ topic = config.getString(TOPIC_CONFIG);
+ url = config.getString(URL_CONFIG);
+ queries = config.getList(QUERIES_CONFIG);
+ rate = config.getInt(RATE_CONFIG);
}
@Override
diff --git
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 798ae3113..c354a1ea2 100644
---
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
*/
package org.apache.plc4x.kafka;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
@@ -32,6 +33,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.plc4x.kafka.util.VersionUtil;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@@ -45,11 +47,10 @@ Licensed to the Apache Software Foundation (ASF) under one
public class Plc4xSourceTask extends SourceTask {
private final static long WAIT_LIMIT_MILLIS = 100;
private final static long TIMEOUT_LIMIT_MILLIS = 5000;
- private final static String FIELD_KEY = "key"; // TODO: is this really
necessary?
private String topic;
private String url;
- private String query;
+ private List<String> queries;
private PlcConnection plcConnection;
private PlcReader plcReader;
@@ -67,16 +68,22 @@ public String version() {
@Override
public void start(Map<String, String> props) {
- topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
- url = props.get(Plc4xSourceConnector.URL_CONFIG);
- query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
+ AbstractConfig config = new
AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
+ topic = config.getString(Plc4xSourceConnector.TOPIC_CONFIG);
+ url = config.getString(Plc4xSourceConnector.URL_CONFIG);
+ queries = config.getList(Plc4xSourceConnector.QUERIES_CONFIG);
openConnection();
plcReader = plcConnection.getReader()
.orElseThrow(() -> new ConnectException("PlcReader not available
for this type of connection"));
- plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY,
query).build();
+
+ PlcReadRequest.Builder builder = plcReader.readRequestBuilder();
+ for (String query : queries) {
+ builder.addItem(query, query);
+ }
+ plcRequest = builder.build();
int rate =
Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
scheduler = Executors.newScheduledThreadPool(1);
@@ -152,30 +159,35 @@ private synchronized boolean awaitFetch(long
milliseconds) throws InterruptedExc
}
private List<SourceRecord> extractValues(PlcReadResponse<?> response) {
- final PlcResponseCode rc = response.getResponseCode(FIELD_KEY);
-
- if (!rc.equals(PlcResponseCode.OK))
- return null; // TODO: should we really ignore this?
-
- Object rawValue = response.getObject(FIELD_KEY);
- Schema valueSchema = getSchema(rawValue.getClass());
- Object value = valueSchema.equals(Schema.STRING_SCHEMA) ?
rawValue.toString() : rawValue;
- Long timestamp = System.currentTimeMillis();
- Map<String, String> sourcePartition = Collections.singletonMap("url",
url);
- Map<String, Long> sourceOffset = Collections.singletonMap("offset",
timestamp);
-
- SourceRecord record =
- new SourceRecord(
- sourcePartition,
- sourceOffset,
- topic,
- Schema.STRING_SCHEMA,
- query,
- valueSchema,
- value
- );
-
- return Collections.singletonList(record); // TODO: what if there are
multiple values?
+ final List<SourceRecord> result = new LinkedList<>();
+ for (String query : queries) {
+ final PlcResponseCode rc = response.getResponseCode(query);
+ if (!rc.equals(PlcResponseCode.OK)) {
+ continue;
+ }
+
+ Object rawValue = response.getObject(query);
+ Schema valueSchema = getSchema(rawValue.getClass());
+ Object value = valueSchema.equals(Schema.STRING_SCHEMA) ?
rawValue.toString() : rawValue;
+ Long timestamp = System.currentTimeMillis();
+ Map<String, String> sourcePartition =
Collections.singletonMap("url", url);
+ Map<String, Long> sourceOffset =
Collections.singletonMap("offset", timestamp);
+
+ SourceRecord record =
+ new SourceRecord(
+ sourcePartition,
+ sourceOffset,
+ topic,
+ Schema.STRING_SCHEMA,
+ query,
+ valueSchema,
+ value
+ );
+
+ result.add(record);
+ }
+
+ return result;
}
private Schema getSchema(Class<?> type) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services