This is an automated email from the ASF dual-hosted git repository.
sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git
The following commit(s) were added to refs/heads/master by this push:
new a0dcdcf STREAMS-633: kafka config
new 0157972 Merge pull request #474 from steveblackmon/STREAMS-633
a0dcdcf is described below
commit a0dcdcf0d9c99478ae45e5251a1feea6b2b5a86f
Author: Steve Blackmon <[email protected]>
AuthorDate: Wed Jan 9 19:02:45 2019 -0600
STREAMS-633: kafka config
resolves STREAMS-633
---
streams-contrib/streams-persist-kafka/pom.xml | 4 +--
.../apache/streams/kafka/KafkaPersistReader.java | 8 ++---
.../apache/streams/kafka/KafkaPersistWriter.java | 8 ++---
.../apache/streams/kafka/KafkaConfiguration.json | 14 +-------
.../streams/kafka/KafkaReaderConfiguration.json | 40 ++++++++++++++++++++++
...guration.json => KafkaWriterConfiguration.json} | 10 +-----
6 files changed, 52 insertions(+), 32 deletions(-)
diff --git a/streams-contrib/streams-persist-kafka/pom.xml
b/streams-contrib/streams-persist-kafka/pom.xml
index 6532aa4..3d974eb 100644
--- a/streams-contrib/streams-persist-kafka/pom.xml
+++ b/streams-contrib/streams-persist-kafka/pom.xml
@@ -30,8 +30,8 @@
<description>Kafka Module</description>
<properties>
- <scala.version>2.9.2</scala.version>
- <kafka.version>0.8.0</kafka.version>
+ <scala.version>2.11</scala.version>
+ <kafka.version>1.0.0</kafka.version>
<clojure.version>1.2.0</clojure.version>
</properties>
diff --git
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index e17303a..d5377d9 100644
---
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -59,7 +59,7 @@ public class KafkaPersistReader implements
StreamsPersistReader, Serializable {
private ObjectMapper mapper = new ObjectMapper();
- private KafkaConfiguration config;
+ private KafkaReaderConfiguration config;
private ConsumerConnector consumerConnector;
@@ -71,7 +71,7 @@ public class KafkaPersistReader implements
StreamsPersistReader, Serializable {
* KafkaPersistReader constructor - resolves KafkaConfiguration from JVM
'kafka'.
*/
public KafkaPersistReader() {
- this.config = new
ComponentConfigurator<>(KafkaConfiguration.class).detectConfiguration();
+ this.config = new
ComponentConfigurator<>(KafkaReaderConfiguration.class).detectConfiguration();
this.persistQueue = new ConcurrentLinkedQueue<>();
}
@@ -79,11 +79,11 @@ public class KafkaPersistReader implements
StreamsPersistReader, Serializable {
* KafkaPersistReader constructor - uses supplied persistQueue.
*/
public KafkaPersistReader(Queue<StreamsDatum> persistQueue) {
- this.config = new
ComponentConfigurator<>(KafkaConfiguration.class).detectConfiguration();
+ this.config = new
ComponentConfigurator<>(KafkaReaderConfiguration.class).detectConfiguration();
this.persistQueue = persistQueue;
}
- public void setConfig(KafkaConfiguration config) {
+ public void setConfig(KafkaReaderConfiguration config) {
this.config = config;
}
diff --git
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index 25fc030..9ea985d 100644
---
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -50,7 +50,7 @@ public class KafkaPersistWriter implements
StreamsPersistWriter, Serializable, R
private ObjectMapper mapper = new ObjectMapper();
- private KafkaConfiguration config;
+ private KafkaWriterConfiguration config;
private Producer<String, String> producer;
@@ -58,7 +58,7 @@ public class KafkaPersistWriter implements
StreamsPersistWriter, Serializable, R
* KafkaPersistWriter constructor
*/
public KafkaPersistWriter() {
- this.config = new
ComponentConfigurator<>(KafkaConfiguration.class).detectConfiguration();
+ this.config = new
ComponentConfigurator<>(KafkaWriterConfiguration.class).detectConfiguration();
this.persistQueue = new ConcurrentLinkedQueue<>();
}
@@ -66,11 +66,11 @@ public class KafkaPersistWriter implements
StreamsPersistWriter, Serializable, R
* KafkaPersistWriter constructor - uses supplied persistQueue.
*/
public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) {
- this.config = new
ComponentConfigurator<>(KafkaConfiguration.class).detectConfiguration();
+ this.config = new
ComponentConfigurator<>(KafkaWriterConfiguration.class).detectConfiguration();
this.persistQueue = persistQueue;
}
- public void setConfig(KafkaConfiguration config) {
+ public void setConfig(KafkaWriterConfiguration config) {
this.config = config;
}
diff --git
a/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
index 88e5554..f44d1c6 100644
---
a/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
+++
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
@@ -10,19 +10,7 @@
"properties": {
"brokerlist": {
"type": "string",
- "description": "A comma-delimited list of broker nodes, used by
producer"
- },
- "zkconnect": {
- "type": "string",
- "description": "A comma-delimited list of zookeeper host:ports,
used by consumer"
- },
- "topic": {
- "type": "string",
- "description": "A topic to read/write from"
- },
- "groupId": {
- "type": "string",
- "description": "A required field for partitioning distributed
consumers"
+ "description": "A comma-delimited list of broker nodes"
}
}
}
\ No newline at end of file
diff --git
a/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaReaderConfiguration.json
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaReaderConfiguration.json
new file mode 100644
index 0000000..f8d2672
--- /dev/null
+++
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaReaderConfiguration.json
@@ -0,0 +1,40 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.kafka.KafkaReaderConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "brokerlist": {
+ "type": "string",
+ "description": "A comma-delimited list of broker nodes, used by producer"
+ },
+ "zkconnect": {
+ "type": "string",
+ "description": "A comma-delimited list of zookeeper host:ports, used by
consumer"
+ },
+ "topic": {
+ "type": "string",
+ "description": "A topic to read/write from"
+ },
+ "group": {
+ "type": "string",
+ "description": "A required field for partitioning distributed consumers"
+ },
+ "start_from": {
+ "type": "string",
+ "enum": [
+ "earliest",
+ "latest",
+ "group_offsets",
+ "timestamp"
+ ]
+ },
+ "start_from_timestamp": {
+ "type": "long"
+ }
+ }
+}
\ No newline at end of file
diff --git
a/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaWriterConfiguration.json
similarity index 58%
copy from
streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
copy to
streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaWriterConfiguration.json
index 88e5554..c4185e0 100644
---
a/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
+++
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaWriterConfiguration.json
@@ -5,24 +5,16 @@
],
"id": "#",
"type": "object",
- "javaType" : "org.apache.streams.kafka.KafkaConfiguration",
+ "javaType" : "org.apache.streams.kafka.KafkaWriterConfiguration",
"javaInterfaces": ["java.io.Serializable"],
"properties": {
"brokerlist": {
"type": "string",
"description": "A comma-delimited list of broker nodes, used by
producer"
},
- "zkconnect": {
- "type": "string",
- "description": "A comma-delimited list of zookeeper host:ports,
used by consumer"
- },
"topic": {
"type": "string",
"description": "A topic to read/write from"
- },
- "groupId": {
- "type": "string",
- "description": "A required field for partitioning distributed
consumers"
}
}
}
\ No newline at end of file