This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git


The following commit(s) were added to refs/heads/master by this push:
     new a0dcdcf  STREAMS-633: kafka config
     new 0157972  Merge pull request #474 from steveblackmon/STREAMS-633
a0dcdcf is described below

commit a0dcdcf0d9c99478ae45e5251a1feea6b2b5a86f
Author: Steve Blackmon <[email protected]>
AuthorDate: Wed Jan 9 19:02:45 2019 -0600

    STREAMS-633: kafka config
    
    resolves STREAMS-633
---
 streams-contrib/streams-persist-kafka/pom.xml      |  4 +--
 .../apache/streams/kafka/KafkaPersistReader.java   |  8 ++---
 .../apache/streams/kafka/KafkaPersistWriter.java   |  8 ++---
 .../apache/streams/kafka/KafkaConfiguration.json   | 14 +-------
 .../streams/kafka/KafkaReaderConfiguration.json    | 40 ++++++++++++++++++++++
 ...guration.json => KafkaWriterConfiguration.json} | 10 +-----
 6 files changed, 52 insertions(+), 32 deletions(-)

diff --git a/streams-contrib/streams-persist-kafka/pom.xml 
b/streams-contrib/streams-persist-kafka/pom.xml
index 6532aa4..3d974eb 100644
--- a/streams-contrib/streams-persist-kafka/pom.xml
+++ b/streams-contrib/streams-persist-kafka/pom.xml
@@ -30,8 +30,8 @@
     <description>Kafka Module</description>
 
     <properties>
-        <scala.version>2.9.2</scala.version>
-        <kafka.version>0.8.0</kafka.version>
+        <scala.version>2.11</scala.version>
+        <kafka.version>1.0.0</kafka.version>
         <clojure.version>1.2.0</clojure.version>
     </properties>
 
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index e17303a..d5377d9 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -59,7 +59,7 @@ public class KafkaPersistReader implements 
StreamsPersistReader, Serializable {
 
   private ObjectMapper mapper = new ObjectMapper();
 
-  private KafkaConfiguration config;
+  private KafkaReaderConfiguration config;
 
   private ConsumerConnector consumerConnector;
 
@@ -71,7 +71,7 @@ public class KafkaPersistReader implements 
StreamsPersistReader, Serializable {
    * KafkaPersistReader constructor - resolves KafkaConfiguration from JVM 
'kafka'.
    */
   public KafkaPersistReader() {
-    this.config = new 
ComponentConfigurator<>(KafkaConfiguration.class).detectConfiguration();
+    this.config = new 
ComponentConfigurator<>(KafkaReaderConfiguration.class).detectConfiguration();
     this.persistQueue  = new ConcurrentLinkedQueue<>();
   }
 
@@ -79,11 +79,11 @@ public class KafkaPersistReader implements 
StreamsPersistReader, Serializable {
    * KafkaPersistReader constructor - uses supplied persistQueue.
    */
   public KafkaPersistReader(Queue<StreamsDatum> persistQueue) {
-    this.config = new 
ComponentConfigurator<>(KafkaConfiguration.class).detectConfiguration();
+    this.config = new 
ComponentConfigurator<>(KafkaReaderConfiguration.class).detectConfiguration();
     this.persistQueue = persistQueue;
   }
 
-  public void setConfig(KafkaConfiguration config) {
+  public void setConfig(KafkaReaderConfiguration config) {
     this.config = config;
   }
 
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index 25fc030..9ea985d 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -50,7 +50,7 @@ public class KafkaPersistWriter implements 
StreamsPersistWriter, Serializable, R
 
   private ObjectMapper mapper = new ObjectMapper();
 
-  private KafkaConfiguration config;
+  private KafkaWriterConfiguration config;
 
   private Producer<String, String> producer;
 
@@ -58,7 +58,7 @@ public class KafkaPersistWriter implements 
StreamsPersistWriter, Serializable, R
    * KafkaPersistWriter constructor
    */
   public KafkaPersistWriter() {
-    this.config = new 
ComponentConfigurator<>(KafkaConfiguration.class).detectConfiguration();
+    this.config = new 
ComponentConfigurator<>(KafkaWriterConfiguration.class).detectConfiguration();
     this.persistQueue  = new ConcurrentLinkedQueue<>();
   }
 
@@ -66,11 +66,11 @@ public class KafkaPersistWriter implements 
StreamsPersistWriter, Serializable, R
    * KafkaPersistWriter constructor - uses supplied persistQueue.
    */
   public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) {
-    this.config = new 
ComponentConfigurator<>(KafkaConfiguration.class).detectConfiguration();
+    this.config = new 
ComponentConfigurator<>(KafkaWriterConfiguration.class).detectConfiguration();
     this.persistQueue = persistQueue;
   }
 
-  public void setConfig(KafkaConfiguration config) {
+  public void setConfig(KafkaWriterConfiguration config) {
     this.config = config;
   }
 
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
 
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
index 88e5554..f44d1c6 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
+++ 
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
@@ -10,19 +10,7 @@
     "properties": {
         "brokerlist": {
             "type": "string",
-            "description": "A comma-delimited list of broker nodes, used by 
producer"
-        },
-        "zkconnect": {
-            "type": "string",
-            "description": "A comma-delimited list of zookeeper host:ports, 
used by consumer"
-        },
-        "topic": {
-            "type": "string",
-            "description": "A topic to read/write from"
-        },
-        "groupId": {
-            "type": "string",
-            "description": "A required field for partitioning distributed 
consumers"
+            "description": "A comma-delimited list of broker nodes"
         }
     }
 }
\ No newline at end of file
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaReaderConfiguration.json
 
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaReaderConfiguration.json
new file mode 100644
index 0000000..f8d2672
--- /dev/null
+++ 
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaReaderConfiguration.json
@@ -0,0 +1,40 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema";,
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0";
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.kafka.KafkaReaderConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "brokerlist": {
+      "type": "string",
+      "description": "A comma-delimited list of broker nodes, used by producer"
+    },
+    "zkconnect": {
+      "type": "string",
+      "description": "A comma-delimited list of zookeeper host:ports, used by 
consumer"
+    },
+    "topic": {
+      "type": "string",
+      "description": "A topic to read/write from"
+    },
+    "group": {
+      "type": "string",
+      "description": "A required field for partitioning distributed consumers"
+    },
+    "start_from": {
+      "type": "string",
+      "enum": [
+        "earliest",
+        "latest",
+        "group_offsets",
+        "timestamp"
+      ]
+    },
+    "start_from_timestamp": {
+      "type": "long"
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
 
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaWriterConfiguration.json
similarity index 58%
copy from 
streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
copy to 
streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaWriterConfiguration.json
index 88e5554..c4185e0 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaConfiguration.json
+++ 
b/streams-contrib/streams-persist-kafka/src/main/jsonschema/org/apache/streams/kafka/KafkaWriterConfiguration.json
@@ -5,24 +5,16 @@
     ],
     "id": "#",
     "type": "object",
-    "javaType" : "org.apache.streams.kafka.KafkaConfiguration",
+    "javaType" : "org.apache.streams.kafka.KafkaWriterConfiguration",
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
         "brokerlist": {
             "type": "string",
             "description": "A comma-delimited list of broker nodes, used by 
producer"
         },
-        "zkconnect": {
-            "type": "string",
-            "description": "A comma-delimited list of zookeeper host:ports, 
used by consumer"
-        },
         "topic": {
             "type": "string",
             "description": "A topic to read/write from"
-        },
-        "groupId": {
-            "type": "string",
-            "description": "A required field for partitioning distributed 
consumers"
         }
     }
 }
\ No newline at end of file

Reply via email to