This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eaecd64  Refactor pinot-connectors to break the dependencies from 
kafka 0.9 (#4445)
eaecd64 is described below

commit eaecd64e47d9090f7c37a2876fb5808cd3893739
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Jul 19 16:16:55 2019 -0700

    Refactor pinot-connectors to break the dependencies from kafka 0.9 (#4445)
    
    * Refactor pinot-connectors to break the dependencies from kafka 0.9
    
    * address comments
---
 pinot-common/pom.xml                               |   4 -
 pinot-connectors/pinot-connector-kafka-0.9/pom.xml |   7 +-
 .../realtime/impl/kafka/KafkaStarterUtils.java     | 171 ---------------------
 .../impl/kafka/server/KafkaDataProducer.java       |  57 +++++++
 .../kafka/server/KafkaDataServerStartable.java     | 109 +++++++++++++
 pinot-core/pom.xml                                 |   4 -
 .../core/realtime/stream/StreamDataProducer.java   |  35 +++++
 .../core/realtime/stream/StreamDataProvider.java   |  46 ++++++
 .../realtime/stream/StreamDataServerStartable.java |  56 +++++++
 .../function/FunctionExpressionEvaluatorTest.java  |   1 -
 pinot-integration-tests/pom.xml                    |   3 +-
 .../tests/BaseClusterIntegrationTest.java          |  24 +--
 .../ControllerPeriodicTasksIntegrationTests.java   |   2 +-
 .../tests/HybridClusterIntegrationTest.java        |   2 +-
 ...ridClusterIntegrationTestCommandLineRunner.java |  10 +-
 .../tests/RealtimeClusterIntegrationTest.java      |   2 +-
 pinot-perf/pom.xml                                 |   3 +-
 .../perf/BenchmarkRealtimeConsumptionSpeed.java    |   8 +-
 .../org/apache/pinot/perf/RealtimeStressTest.java  |   8 +-
 pinot-tools/pom.xml                                |   3 +-
 .../org/apache/pinot/tools/HybridQuickstart.java   |  38 +++--
 .../org/apache/pinot/tools/KafkaStarterUtils.java  | 103 +++++++++++++
 .../org/apache/pinot/tools/RealtimeQuickStart.java |  33 ++--
 .../tools/admin/command/StartKafkaCommand.java     |  14 +-
 .../admin/command/StreamAvroIntoKafkaCommand.java  |  22 ++-
 .../pinot/tools/streams/AirlineDataStream.java     |  19 +--
 .../pinot/tools/streams/MeetupRsvpStream.java      |  33 ++--
 pom.xml                                            |   9 +-
 28 files changed, 536 insertions(+), 290 deletions(-)

diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index 2e27ac8..41c9b39 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -198,10 +198,6 @@
       <artifactId>jopt-simple</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-    </dependency>
-    <dependency>
       <groupId>nl.jqno.equalsverifier</groupId>
       <artifactId>equalsverifier</artifactId>
       <scope>test</scope>
diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml 
b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
index e8f6c93..0450102 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
@@ -42,7 +42,7 @@
     <!-- Kafka  -->
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_${kafka.scala.version}</artifactId>
+      <artifactId>kafka_2.10</artifactId>
       <version>${kafka.lib.version}</version>
       <exclusions>
         <exclusion>
@@ -63,5 +63,10 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>2.10.5</version>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
deleted file mode 100644
index 5f1de99..0000000
--- 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka;
-
-import java.io.File;
-import java.security.Permission;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import kafka.admin.TopicCommand;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-
-
-/**
- * Utilities to start Kafka during unit tests.
- *
- */
-public class KafkaStarterUtils {
-  public static final int DEFAULT_KAFKA_PORT = 19092;
-  public static final int DEFAULT_BROKER_ID = 0;
-  public static final String DEFAULT_ZK_STR = ZkStarter.DEFAULT_ZK_STR + 
"/kafka";
-  public static final String DEFAULT_KAFKA_BROKER = "localhost:" + 
DEFAULT_KAFKA_PORT;
-
-  public static Properties getDefaultKafkaConfiguration() {
-    final Properties configuration = new Properties();
-
-    // Enable topic deletion by default for integration tests
-    configureTopicDeletion(configuration, true);
-
-    // Set host name
-    configureHostName(configuration, "localhost");
-
-    return configuration;
-  }
-
-  public static List<KafkaServerStartable> startServers(final int brokerCount, 
final int port, final String zkStr,
-      final Properties configuration) {
-    List<KafkaServerStartable> startables = new ArrayList<>(brokerCount);
-
-    for (int i = 0; i < brokerCount; i++) {
-      startables.add(startServer(port + i, i, zkStr, "/tmp/kafka-" + 
Double.toHexString(Math.random()), configuration));
-    }
-
-    return startables;
-  }
-
-  public static KafkaServerStartable startServer(final int port, final int 
brokerId, final String zkStr,
-      final Properties configuration) {
-    return startServer(port, brokerId, zkStr, "/tmp/kafka-" + 
Double.toHexString(Math.random()), configuration);
-  }
-
-  public static KafkaServerStartable startServer(final int port, final int 
brokerId, final String zkStr,
-      final String logDirPath, final Properties configuration) {
-    // Create the ZK nodes for Kafka, if needed
-    int indexOfFirstSlash = zkStr.indexOf('/');
-    if (indexOfFirstSlash != -1) {
-      String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
-      String zkNodePath = zkStr.substring(indexOfFirstSlash);
-      ZkClient client = new ZkClient(bareZkUrl);
-      client.createPersistent(zkNodePath, true);
-      client.close();
-    }
-
-    File logDir = new File(logDirPath);
-    logDir.mkdirs();
-
-    configureKafkaPort(configuration, port);
-    configureZkConnectionString(configuration, zkStr);
-    configureBrokerId(configuration, brokerId);
-    configureKafkaLogDirectory(configuration, logDir);
-    configuration.put("zookeeper.session.timeout.ms", "60000");
-    KafkaConfig config = new KafkaConfig(configuration);
-
-    KafkaServerStartable serverStartable = new KafkaServerStartable(config);
-    serverStartable.startup();
-
-    return serverStartable;
-  }
-
-  public static void configureSegmentSizeBytes(Properties properties, int 
segmentSize) {
-    properties.put("log.segment.bytes", Integer.toString(segmentSize));
-  }
-
-  public static void configureLogRetentionSizeBytes(Properties properties, int 
logRetentionSizeBytes) {
-    properties.put("log.retention.bytes", 
Integer.toString(logRetentionSizeBytes));
-  }
-
-  public static void configureKafkaLogDirectory(Properties configuration, File 
logDir) {
-    configuration.put("log.dirs", logDir.getAbsolutePath());
-  }
-
-  public static void configureBrokerId(Properties configuration, int brokerId) 
{
-    configuration.put("broker.id", Integer.toString(brokerId));
-  }
-
-  public static void configureZkConnectionString(Properties configuration, 
String zkStr) {
-    configuration.put("zookeeper.connect", zkStr);
-  }
-
-  public static void configureKafkaPort(Properties configuration, int port) {
-    configuration.put("port", Integer.toString(port));
-  }
-
-  public static void configureTopicDeletion(Properties configuration, boolean 
topicDeletionEnabled) {
-    configuration.put("delete.topic.enable", 
Boolean.toString(topicDeletionEnabled));
-  }
-
-  public static void configureHostName(Properties configuration, String 
hostName) {
-    configuration.put("host.name", hostName);
-  }
-
-  public static void stopServer(KafkaServerStartable serverStartable) {
-    serverStartable.shutdown();
-    FileUtils.deleteQuietly(new 
File(serverStartable.serverConfig().logDirs().apply(0)));
-  }
-
-  public static void createTopic(String kafkaTopic, String zkStr, int 
partitionCount) {
-    invokeTopicCommand(
-        new String[]{"--create", "--zookeeper", zkStr, "--replication-factor", 
"1", "--partitions", Integer.toString(
-            partitionCount), "--topic", kafkaTopic});
-  }
-
-  private static void invokeTopicCommand(String[] args) {
-    // jfim: Use Java security to trap System.exit in Kafka 0.9's TopicCommand
-    System.setSecurityManager(new SecurityManager() {
-      @Override
-      public void checkPermission(Permission perm) {
-        if (perm.getName().startsWith("exitVM")) {
-          throw new SecurityException("System.exit is disabled");
-        }
-      }
-
-      @Override
-      public void checkPermission(Permission perm, Object context) {
-        checkPermission(perm);
-      }
-    });
-
-    try {
-      TopicCommand.main(args);
-    } catch (SecurityException ex) {
-      // Do nothing, this is caused by our security manager that disables 
System.exit
-    }
-
-    System.setSecurityManager(null);
-  }
-
-  public static void deleteTopic(String kafkaTopic, String zkStr) {
-    invokeTopicCommand(new String[]{"--delete", "--zookeeper", zkStr, 
"--topic", kafkaTopic});
-  }
-}
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
new file mode 100644
index 0000000..0eb4ac6
--- /dev/null
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka.server;
+
+import java.util.Properties;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+
+
+public class KafkaDataProducer implements StreamDataProducer {
+  Producer<byte[], byte[]> producer;
+
+  @Override
+  public void init(Properties props) {
+    ProducerConfig producerConfig = new ProducerConfig(props);
+    this.producer = new Producer(producerConfig);
+  }
+
+  @Override
+  public void produce(String topic, byte[] payload) {
+    KeyedMessage<byte[], byte[]> data = new KeyedMessage<>(topic, payload);
+    this.produce(data);
+  }
+
+  @Override
+  public void produce(String topic, byte[] key, byte[] payload) {
+    KeyedMessage<byte[], byte[]> data = new KeyedMessage<>(topic, key, 
payload);
+    this.produce(data);
+  }
+
+  public void produce(KeyedMessage message) {
+    producer.send(message);
+  }
+
+  @Override
+  public void close() {
+    producer.close();
+  }
+}
diff --git 
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
new file mode 100644
index 0000000..1c2a8ff
--- /dev/null
+++ 
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka.server;
+
+import java.io.File;
+import java.security.Permission;
+import java.util.Properties;
+import kafka.admin.TopicCommand;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaDataServerStartable implements StreamDataServerStartable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaDataServerStartable.class);
+
+  private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
+  private static final String LOG_DIRS = "log.dirs";
+
+  private KafkaServerStartable serverStartable;
+  private String zkStr;
+  private String logDirPath;
+
+  private static void invokeTopicCommand(String[] args) {
+    // jfim: Use Java security to trap System.exit in Kafka 0.9's TopicCommand
+    System.setSecurityManager(new SecurityManager() {
+      @Override
+      public void checkPermission(Permission perm) {
+        if (perm.getName().startsWith("exitVM")) {
+          throw new SecurityException("System.exit is disabled");
+        }
+      }
+
+      @Override
+      public void checkPermission(Permission perm, Object context) {
+        checkPermission(perm);
+      }
+    });
+
+    try {
+      TopicCommand.main(args);
+    } catch (SecurityException ex) {
+      // Do nothing, this is caused by our security manager that disables 
System.exit
+    }
+
+    System.setSecurityManager(null);
+  }
+
+  public void init(Properties props) {
+    zkStr = props.getProperty(ZOOKEEPER_CONNECT);
+    logDirPath = props.getProperty(LOG_DIRS);
+
+    // Create the ZK nodes for Kafka, if needed
+    int indexOfFirstSlash = zkStr.indexOf('/');
+    if (indexOfFirstSlash != -1) {
+      String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
+      String zkNodePath = zkStr.substring(indexOfFirstSlash);
+      ZkClient client = new ZkClient(bareZkUrl);
+      client.createPersistent(zkNodePath, true);
+      client.close();
+    }
+
+    File logDir = new File(logDirPath);
+    logDir.mkdirs();
+
+    props.put("zookeeper.session.timeout.ms", "60000");
+    KafkaConfig config = new KafkaConfig(props);
+
+    serverStartable = new KafkaServerStartable(config);
+  }
+
+  @Override
+  public void start() {
+    serverStartable.startup();
+  }
+
+  @Override
+  public void stop() {
+    serverStartable.shutdown();
+    FileUtils.deleteQuietly(new 
File(serverStartable.serverConfig().logDirs().apply(0)));
+  }
+
+  @Override
+  public void createTopic(String topic, Properties props) {
+    invokeTopicCommand(
+        new String[]{"--create", "--zookeeper", this.zkStr, 
"--replication-factor", "1", "--partitions", Integer.toString(
+            (Integer) props.get("partition")), "--topic", topic});
+  }
+}
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index 51c9231..d074639 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -161,10 +161,6 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-    </dependency>
-    <dependency>
       <groupId>net.sf.jopt-simple</groupId>
       <artifactId>jopt-simple</artifactId>
     </dependency>
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
new file mode 100644
index 0000000..53275a0
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+/**
+ * StreamDataServerStartable is the interface for stream data sources. E.g. 
KafkaServerStartable, KinesisServerStarable.
+ */
+public interface StreamDataProducer {
+  void init(Properties props);
+
+  void produce(String topic, byte[] payload);
+
+  void produce(String topic, byte[] key, byte[] payload);
+
+  void close();
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
new file mode 100644
index 0000000..05e0827
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+/**
+ * StreamDataProvider provides StreamDataServerStartable and 
StreamDataProducer based on
+ * given implementation class name.
+ * E.g. KafkaDataServerStartable, KafkaDataProducer.
+ *
+ */
+public class StreamDataProvider {
+  public static StreamDataServerStartable getServerDataStartable(String clazz, 
Properties props)
+      throws Exception {
+    final StreamDataServerStartable streamDataServerStartable =
+        (StreamDataServerStartable) Class.forName(clazz).newInstance();
+    streamDataServerStartable.init(props);
+    return streamDataServerStartable;
+  }
+
+  public static StreamDataProducer getStreamDataProducer(String clazz, 
Properties props)
+      throws Exception {
+
+    final StreamDataProducer streamDataProducer = (StreamDataProducer) 
Class.forName(clazz).newInstance();
+    streamDataProducer.init(props);
+    return streamDataProducer;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
new file mode 100644
index 0000000..34a1a38
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+/**
+ * StreamDataServerStartable is the interface for stream data sources.
+ * Each stream data connector should implement a mock/wrapper of the data 
server.
+ *
+ * E.g. KafkaDataServerStartable is a wrapper class of Kafka 0.9 broker.
+ *
+ */
+public interface StreamDataServerStartable {
+  /**
+   * Init the server.
+   *
+   * @param props
+   */
+  void init(Properties props);
+
+  /**
+   * Start the server
+   */
+  void start();
+
+  /**
+   * Stop the server
+   */
+  void stop();
+
+  /**
+   * Create a data stream (e.g Kafka topic) in the server.
+   *
+   * @param topic
+   * @param topicProps
+   */
+  void createTopic(String topic, Properties topicProps);
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
index b6b41ff..dca78da 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
@@ -26,7 +26,6 @@ import org.joda.time.MutableDateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import scala.collection.mutable.StringBuilder;
 
 
 public class FunctionExpressionEvaluatorTest {
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index b58647f..8277e84 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -191,8 +191,9 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+      <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
       <version>${project.version}</version>
+      <scope>runtime</scope>
     </dependency>
     <dependency>
       <groupId>org.testng</groupId>
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 67fabc6..d94b7b6 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import kafka.server.KafkaServerStartable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.ConnectionFactory;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
@@ -40,7 +39,8 @@ import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.tools.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 
@@ -73,12 +73,13 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   protected final File _avroDir = new File(_tempDir, "avroDir");
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
   protected final File _tarDir = new File(_tempDir, "tarDir");
-  protected List<KafkaServerStartable> _kafkaStarters;
+  protected List<StreamDataServerStartable> _kafkaStarters;
 
   private org.apache.pinot.client.Connection _pinotConnection;
   private Connection _h2Connection;
   private QueryGenerator _queryGenerator;
 
+
   /**
    * The following getters can be overridden to change default settings.
    */
@@ -318,8 +319,9 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
       @Override
       public void run() {
         try {
-          ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
-              getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn());
+          ClusterIntegrationTestUtils
+              .pushAvroIntoKafka(avroFiles, 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
+                  getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn());
         } catch (Exception e) {
           // Ignored
         }
@@ -328,15 +330,17 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   }
 
   protected void startKafka() {
-    _kafkaStarters = KafkaStarterUtils
-        .startServers(getNumKafkaBrokers(), 
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
+
+    _kafkaStarters =
+        KafkaStarterUtils.startServers(getNumKafkaBrokers(), 
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
             KafkaStarterUtils.getDefaultKafkaConfiguration());
-    KafkaStarterUtils.createTopic(getKafkaTopic(), 
KafkaStarterUtils.DEFAULT_ZK_STR, getNumKafkaPartitions());
+    _kafkaStarters.get(0)
+        .createTopic(getKafkaTopic(), 
KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
   }
 
   protected void stopKafka() {
-    for (KafkaServerStartable kafkaStarter : _kafkaStarters) {
-      KafkaStarterUtils.stopServer(kafkaStarter);
+    for (StreamDataServerStartable kafkaStarter : _kafkaStarters) {
+      kafkaStarter.stop();
     }
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index fc14789..374740c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -51,7 +51,7 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.tools.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.ITestContext;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index 0983d4e..d1fd9ff 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -33,7 +33,7 @@ import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.tools.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index be24be8..ec15591 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -34,14 +34,14 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
-import kafka.server.KafkaServerStartable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.broker.requesthandler.PinotQueryRequest;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.query.comparison.QueryComparison;
+import org.apache.pinot.tools.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.ITestResult;
@@ -188,7 +188,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
     private List<File> _realtimeAvroFiles;
     private File _queryFile;
     private File _responseFile;
-    private KafkaServerStartable _kafkaStarter;
+    private StreamDataServerStartable _kafkaStarter;
     private long _countStarResult;
 
     public CustomHybridClusterIntegrationTest() {
@@ -262,7 +262,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
           KafkaStarterUtils.getDefaultKafkaConfiguration());
 
       // Create Kafka topic
-      KafkaStarterUtils.createTopic(getKafkaTopic(), KAFKA_ZK_STR, 
getNumKafkaPartitions());
+      _kafkaStarter.createTopic(getKafkaTopic(), 
KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
 
       // Start the Pinot cluster
       ControllerConf config = getDefaultControllerConfiguration();
@@ -379,7 +379,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
       stopServer();
       stopBroker();
       stopController();
-      KafkaStarterUtils.stopServer(_kafkaStarter);
+      _kafkaStarter.stop();
       stopZk();
 
       FileUtils.deleteDirectory(_tempDir);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 8b4baeb..6dc4fc6 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -26,7 +26,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.Schema;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.tools.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 74bb09f..3c0165e 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -52,8 +52,9 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+      <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
       <version>${project.version}</version>
+      <scope>runtime</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
index 8bbb4d5..e623ece 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import kafka.server.KafkaServerStartable;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.tools.KafkaStarterUtils;
 import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
 import org.apache.pinot.util.TestUtils;
 
@@ -58,12 +58,12 @@ public class BenchmarkRealtimeConsumptionSpeed extends 
RealtimeClusterIntegratio
       throws Exception {
     // Start ZK and Kafka
     startZk();
-    KafkaServerStartable kafkaStarter = KafkaStarterUtils
+    StreamDataServerStartable kafkaStarter = KafkaStarterUtils
         .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, 
KafkaStarterUtils.DEFAULT_BROKER_ID,
             KafkaStarterUtils.DEFAULT_ZK_STR, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
 
     // Create Kafka topic
-    KafkaStarterUtils.createTopic(getKafkaTopic(), 
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+    kafkaStarter.createTopic(getKafkaTopic(), 
KafkaStarterUtils.getTopicCreationProps(10));
 
     // Unpack data (needed to get the Avro schema)
     TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
index 71d28e7..b945fc5 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import kafka.server.KafkaServerStartable;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.tools.KafkaStarterUtils;
 import org.apache.pinot.integration.tests.OfflineClusterIntegrationTest;
 import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
 import org.apache.pinot.util.TestUtils;
@@ -59,12 +59,12 @@ public class RealtimeStressTest extends 
RealtimeClusterIntegrationTest {
       throws Exception {
     // Start ZK and Kafka
     startZk();
-    KafkaServerStartable kafkaStarter = KafkaStarterUtils
+    StreamDataServerStartable kafkaStarter = KafkaStarterUtils
         .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, 
KafkaStarterUtils.DEFAULT_BROKER_ID,
             KafkaStarterUtils.DEFAULT_ZK_STR, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
 
     // Create Kafka topic
-    KafkaStarterUtils.createTopic(getKafkaTopic(), 
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+    kafkaStarter.createTopic(getKafkaTopic(), 
KafkaStarterUtils.getTopicCreationProps(10));
 
     // Unpack data (needed to get the Avro schema)
     TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 6e18295..cf9f507 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -56,8 +56,9 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+      <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
       <version>${project.version}</version>
+      <scope>runtime</scope>
     </dependency>
     <dependency>
       <groupId>commons-cli</groupId>
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index fe004d3..08d9b2a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -23,12 +23,13 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import kafka.server.KafkaServerStartable;
+import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.core.data.readers.FileFormat;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.AirlineDataStream;
@@ -37,16 +38,21 @@ import static org.apache.pinot.tools.Quickstart.printStatus;
 
 
 public class HybridQuickstart {
-  private HybridQuickstart() {
-  }
-
   private File _offlineQuickStartDataDir;
   private File _realtimeQuickStartDataDir;
-  private KafkaServerStartable _kafkaStarter;
+  private StreamDataServerStartable _kafkaStarter;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private File _schemaFile;
   private File _dataFile;
 
+  private HybridQuickstart() {
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    new HybridQuickstart().execute();
+  }
+
   private QuickstartTableRequest prepareOfflineTableRequest()
       throws IOException {
     _offlineQuickStartDataDir = new File("quickStartData" + 
System.currentTimeMillis());
@@ -94,11 +100,14 @@ public class HybridQuickstart {
   private void startKafka() {
     _zookeeperInstance = ZkStarter.startLocalZkServer();
 
-    _kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, 
KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
-
-    KafkaStarterUtils.createTopic("airlineStatsEvents", 
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+    String kafkaClazz = 
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+    try {
+      _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + kafkaClazz, e);
+    }
+    _kafkaStarter.start();
+    _kafkaStarter.createTopic("airlineStatsEvents", 
KafkaStarterUtils.getTopicCreationProps(10));
   }
 
   public void execute()
@@ -153,7 +162,7 @@ public class HybridQuickstart {
           stream.shutdown();
           Thread.sleep(2000);
           runner.stop();
-          KafkaStarterUtils.stopServer(_kafkaStarter);
+          _kafkaStarter.stop();
           ZkStarter.stopLocalZkServer(_zookeeperInstance);
           FileUtils.deleteDirectory(_offlineQuickStartDataDir);
           FileUtils.deleteDirectory(_realtimeQuickStartDataDir);
@@ -163,9 +172,4 @@ public class HybridQuickstart {
       }
     });
   }
-
-  public static void main(String[] args)
-      throws Exception {
-    new HybridQuickstart().execute();
-  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/KafkaStarterUtils.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/KafkaStarterUtils.java
new file mode 100644
index 0000000..73da8dc
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/KafkaStarterUtils.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+
+
+public class KafkaStarterUtils {
+  public static final int DEFAULT_BROKER_ID = 0;
+  public static final String DEFAULT_ZK_STR = ZkStarter.DEFAULT_ZK_STR + 
"/kafka";
+  public static int DEFAULT_KAFKA_PORT = 19092;
+  public static final String DEFAULT_KAFKA_BROKER = "localhost:" + 
DEFAULT_KAFKA_PORT;
+
+  public static final String PORT = "port";
+  public static final String BROKER_ID = "broker.id";
+  private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
+  private static final String LOG_DIRS = "log.dirs";
+
+  public static final String KAFKA_SERVER_STARTABLE_CLASS_NAME =
+      
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+  public static final String KAFKA_PRODUCER_CLASS_NAME =
+      "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataProducer";
+  public static final String KAFKA_JSON_MESSAGE_DECODER_CLASS_NAME =
+      "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder";
+
+  public static Properties getDefaultKafkaConfiguration() {
+    final Properties configuration = new Properties();
+
+    // Enable topic deletion by default for integration tests
+    configureTopicDeletion(configuration, true);
+
+    // Set host name
+    configureHostName(configuration, "localhost");
+
+    configuration.put(PORT, DEFAULT_KAFKA_PORT);
+    configuration.put(BROKER_ID, DEFAULT_BROKER_ID);
+    configuration.put(ZOOKEEPER_CONNECT, DEFAULT_ZK_STR);
+    configuration.put(LOG_DIRS, "/tmp/kafka-" + 
Double.toHexString(Math.random()));
+
+    return configuration;
+  }
+
+  public static void configureTopicDeletion(Properties configuration, boolean 
topicDeletionEnabled) {
+    configuration.put("delete.topic.enable", 
Boolean.toString(topicDeletionEnabled));
+  }
+
+  public static void configureHostName(Properties configuration, String 
hostName) {
+    configuration.put("host.name", hostName);
+  }
+
+  public static Properties getTopicCreationProps(int numKafkaPartitions) {
+    Properties topicProps = new Properties();
+    topicProps.put("partition", numKafkaPartitions);
+    return topicProps;
+  }
+
+  public static List<StreamDataServerStartable> startServers(final int 
brokerCount, final int port, final String zkStr,
+      final Properties configuration) {
+    List<StreamDataServerStartable> startables = new ArrayList<>(brokerCount);
+
+    for (int i = 0; i < brokerCount; i++) {
+      startables.add(startServer(port + i, i, zkStr, configuration));
+    }
+    return startables;
+  }
+
+  public static StreamDataServerStartable startServer(final int port, final 
int brokerId, final String zkStr,
+      final Properties configuration) {
+    StreamDataServerStartable kafkaStarter;
+    try {
+      configuration.put(KafkaStarterUtils.PORT, port);
+      configuration.put(KafkaStarterUtils.BROKER_ID, brokerId);
+      configuration.put(KafkaStarterUtils.ZOOKEEPER_CONNECT, zkStr);
+      configuration.put(KafkaStarterUtils.LOG_DIRS, "/tmp/kafka-" + 
Double.toHexString(Math.random()));
+      kafkaStarter = 
StreamDataProvider.getServerDataStartable(KAFKA_SERVER_STARTABLE_CLASS_NAME, 
configuration);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + 
KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
+    }
+    kafkaStarter.start();
+    return kafkaStarter;
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 8df1fbb..b970648 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -22,10 +22,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.net.URL;
-import kafka.server.KafkaServerStartable;
+import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.MeetupRsvpStream;
@@ -35,9 +36,16 @@ import static org.apache.pinot.tools.Quickstart.printStatus;
 
 
 public class RealtimeQuickStart {
+  private StreamDataServerStartable _kafkaStarter;
+
   private RealtimeQuickStart() {
   }
 
+  public static void main(String[] args)
+      throws Exception {
+    new RealtimeQuickStart().execute();
+  }
+
   public void execute()
       throws Exception {
     final File quickStartDataDir = new File("quickStartData" + 
System.currentTimeMillis());
@@ -64,10 +72,16 @@ public class RealtimeQuickStart {
 
     printStatus(Color.CYAN, "***** Starting Kafka *****");
     final ZkStarter.ZookeeperInstance zookeeperInstance = 
ZkStarter.startLocalZkServer();
-    final KafkaServerStartable kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, 
KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
-    KafkaStarterUtils.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+
+    String kafkaClazz = 
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+    try {
+      _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + kafkaClazz, e);
+    }
+    _kafkaStarter.start();
+    _kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(10));
+
     printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and 
broker *****");
     runner.startAll();
     printStatus(Color.CYAN, "***** Adding meetupRSVP schema *****");
@@ -87,7 +101,7 @@ public class RealtimeQuickStart {
           printStatus(Color.GREEN, "***** Shutting down realtime quick start 
*****");
           meetupRSVPProvider.stopPublishing();
           runner.stop();
-          KafkaStarterUtils.stopServer(kafkaStarter);
+          _kafkaStarter.stop();
           ZkStarter.stopLocalZkServer(zookeeperInstance);
           FileUtils.deleteDirectory(quickStartDataDir);
         } catch (Exception e) {
@@ -130,9 +144,4 @@ public class RealtimeQuickStart {
 
     printStatus(Color.GREEN, "You can always go to 
http://localhost:9000/query/ to play around in the query console");
   }
-
-  public static void main(String[] args)
-      throws Exception {
-    new RealtimeQuickStart().execute();
-  }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
index 38f042c..53a1d1c 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
@@ -20,8 +20,10 @@ package org.apache.pinot.tools.admin.command;
 
 import java.io.File;
 import java.io.IOException;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Command;
+import org.apache.pinot.tools.KafkaStarterUtils;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +34,7 @@ import org.slf4j.LoggerFactory;
  */
 public class StartKafkaCommand extends AbstractBaseAdminCommand implements 
Command {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StartKafkaCommand.class);
+
   @Option(name = "-port", required = false, metaVar = "<int>", usage = "Port 
to start Kafka server on.")
   private int _port = KafkaStarterUtils.DEFAULT_KAFKA_PORT;
 
@@ -43,6 +46,7 @@ public class StartKafkaCommand extends 
AbstractBaseAdminCommand implements Comma
 
   @Option(name = "-zkAddress", required = false, metaVar = "<string>", usage = 
"Address of Zookeeper.")
   private String _zkAddress = "localhost:2181";
+  private StreamDataServerStartable _kafkaStarter;
 
   @Override
   public boolean getHelp() {
@@ -67,7 +71,13 @@ public class StartKafkaCommand extends 
AbstractBaseAdminCommand implements Comma
   @Override
   public boolean execute()
       throws IOException {
-    KafkaStarterUtils.startServer(_port, _brokerId, _zkAddress, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
+    String kafkaClazz = 
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+    try {
+      _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + kafkaClazz, e);
+    }
+    _kafkaStarter.start();
 
     LOGGER.info("Start kafka at localhost:" + _port + " in thread " + 
Thread.currentThread().getName());
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 0a75023..e9639a8 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -24,15 +24,14 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.common.utils.HashUtil;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.core.util.AvroUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
 import org.apache.pinot.tools.Command;
+import org.apache.pinot.tools.KafkaStarterUtils;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +42,6 @@ import org.slf4j.LoggerFactory;
  */
 public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand 
implements Command {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamAvroIntoKafkaCommand.class);
-
   @Option(name = "-avroFile", required = true, metaVar = "<String>", usage = 
"Avro file to stream.")
   private String _avroFile = null;
 
@@ -104,8 +102,12 @@ public class StreamAvroIntoKafkaCommand extends 
AbstractBaseAdminCommand impleme
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
 
-    ProducerConfig producerConfig = new ProducerConfig(properties);
-    Producer<byte[], byte[]> producer = new Producer<byte[], 
byte[]>(producerConfig);
+    StreamDataProducer streamDataProducer;
+    try {
+      streamDataProducer = 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to get StreamDataProducer - " + 
KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, e);
+    }
     try {
       // Open the Avro file
       DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(new 
File(_avroFile));
@@ -115,11 +117,7 @@ public class StreamAvroIntoKafkaCommand extends 
AbstractBaseAdminCommand impleme
         // Write the message to Kafka
         String recordJson = genericRecord.toString();
         byte[] bytes = recordJson.getBytes("utf-8");
-        KeyedMessage<byte[], byte[]> data =
-            new KeyedMessage<byte[], byte[]>(_kafkaTopic, 
Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)),
-                bytes);
-
-        producer.send(data);
+        streamDataProducer.produce(_kafkaTopic, 
Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes);
 
         // Sleep between messages
         if (sleepRequired) {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index 7c238bf..1d6a4b1 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -27,9 +27,6 @@ import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -37,7 +34,9 @@ import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.data.TimeFieldSpec;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.tools.KafkaStarterUtils;
 import org.apache.pinot.tools.Quickstart;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,12 +50,12 @@ public class AirlineDataStream {
   DataFileStream<GenericRecord> avroDataStream;
   Integer currentTimeValue = 16102;
   boolean keepIndexing = true;
-  private Producer<String, byte[]> producer;
   ExecutorService service;
   int counter = 0;
+  private StreamDataProducer producer;
 
   public AirlineDataStream(Schema pinotSchema, File avroFile)
-      throws FileNotFoundException, IOException {
+      throws Exception {
     this.pinotSchema = pinotSchema;
     this.avroFile = avroFile;
     createStream();
@@ -65,8 +64,8 @@ public class AirlineDataStream {
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
 
-    ProducerConfig producerConfig = new ProducerConfig(properties);
-    producer = new Producer<String, byte[]>(producerConfig);
+    producer = 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
+
     service = Executors.newFixedThreadPool(1);
     Quickstart.printStatus(Quickstart.Color.YELLOW,
         "***** Offine data has max time as 16101, realtime will start 
consuming from time 16102 and increment time every 3000 events *****");
@@ -97,9 +96,7 @@ public class AirlineDataStream {
       avroDataStream = null;
       return;
     }
-    KeyedMessage<String, byte[]> data =
-        new KeyedMessage<String, byte[]>("airlineStatsEvents", 
message.toString().getBytes("UTF-8"));
-    producer.send(data);
+    producer.produce("airlineStatsEvents", 
message.toString().getBytes("UTF-8"));
   }
 
   public void run() {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index 114072f..d0d9ed1 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -23,40 +23,39 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
 import java.util.Properties;
 import javax.websocket.ClientEndpointConfig;
 import javax.websocket.Endpoint;
 import javax.websocket.EndpointConfig;
 import javax.websocket.MessageHandler;
 import javax.websocket.Session;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.tools.KafkaStarterUtils;
 import org.glassfish.tyrus.client.ClientManager;
 
 
 public class MeetupRsvpStream {
+
+  private static final String DEFAULT_KAFKA_BROKER = "localhost:19092";
+
   private Schema schema;
-  private Producer<String, byte[]> producer;
+  private StreamDataProducer producer;
   private boolean keepPublishing = true;
   private ClientManager client;
 
   public MeetupRsvpStream(File schemaFile)
-      throws IOException, URISyntaxException {
+      throws Exception {
     schema = Schema.fromFile(schemaFile);
-
     Properties properties = new Properties();
-    properties.put("metadata.broker.list", 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+    properties.put("metadata.broker.list", DEFAULT_KAFKA_BROKER);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
-
-    ProducerConfig producerConfig = new ProducerConfig(properties);
-    producer = new Producer<String, byte[]>(producerConfig);
+    producer = 
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
 properties);
   }
 
   public void stopPublishing() {
@@ -66,9 +65,9 @@ public class MeetupRsvpStream {
 
   public void run() {
     try {
-
       final ClientEndpointConfig cec = 
ClientEndpointConfig.Builder.create().build();
-      final KafkaJSONMessageDecoder decoder = new KafkaJSONMessageDecoder();
+      final StreamMessageDecoder decoder =
+          (StreamMessageDecoder) 
Class.forName(KafkaStarterUtils.KAFKA_JSON_MESSAGE_DECODER_CLASS_NAME).newInstance();
       decoder.init(null, schema, null);
       client = ClientManager.createClient();
       client.connectToServer(new Endpoint() {
@@ -108,9 +107,7 @@ public class MeetupRsvpStream {
                   extracted.put("rsvp_count", 1);
 
                   if (keepPublishing) {
-                    KeyedMessage<String, byte[]> data =
-                        new KeyedMessage<String, byte[]>("meetupRSVPEvents", 
extracted.toString().getBytes("UTF-8"));
-                    producer.send(data);
+                    producer.produce("meetupRSVPEvents", 
extracted.toString().getBytes(StandardCharsets.UTF_8));
                   }
                 } catch (Exception e) {
                   //LOGGER.error("error processing raw event ", e);
diff --git a/pom.xml b/pom.xml
index a734f94..0d00729 100644
--- a/pom.xml
+++ b/pom.xml
@@ -141,8 +141,7 @@
     kafka dependency is still explicitly defined in pinot-integration-tests, 
pinot-tools and pinot-perf pom files.
     To change kafka connector dependency, we only need to update this version 
number config.
     TODO: figure out a way to inject kafka dependency instead of explicitly 
setting the kafka module dependency -->
-    <kafka.version>0.9</kafka.version>
-    <kafka.scala.version>2.10</kafka.scala.version>
+    <kafka.lib.version>0.9</kafka.lib.version>
   </properties>
 
   <profiles>
@@ -929,12 +928,6 @@
         <artifactId>jopt-simple</artifactId>
         <version>4.6</version>
       </dependency>
-      <!-- kafka_2.10 & larray use scala-library -->
-      <dependency>
-        <groupId>org.scala-lang</groupId>
-        <artifactId>scala-library</artifactId>
-        <version>2.10.5</version>
-      </dependency>
       <dependency>
         <groupId>commons-lang</groupId>
         <artifactId>commons-lang</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to