This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new eaecd64 Refactor pinot-connectors to break the dependencies from
kafka 0.9 (#4445)
eaecd64 is described below
commit eaecd64e47d9090f7c37a2876fb5808cd3893739
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Jul 19 16:16:55 2019 -0700
Refactor pinot-connectors to break the dependencies from kafka 0.9 (#4445)
* Refactor pinot-connectors to break the dependencies from kafka 0.9
* address comments
---
pinot-common/pom.xml | 4 -
pinot-connectors/pinot-connector-kafka-0.9/pom.xml | 7 +-
.../realtime/impl/kafka/KafkaStarterUtils.java | 171 ---------------------
.../impl/kafka/server/KafkaDataProducer.java | 57 +++++++
.../kafka/server/KafkaDataServerStartable.java | 109 +++++++++++++
pinot-core/pom.xml | 4 -
.../core/realtime/stream/StreamDataProducer.java | 35 +++++
.../core/realtime/stream/StreamDataProvider.java | 46 ++++++
.../realtime/stream/StreamDataServerStartable.java | 56 +++++++
.../function/FunctionExpressionEvaluatorTest.java | 1 -
pinot-integration-tests/pom.xml | 3 +-
.../tests/BaseClusterIntegrationTest.java | 24 +--
.../ControllerPeriodicTasksIntegrationTests.java | 2 +-
.../tests/HybridClusterIntegrationTest.java | 2 +-
...ridClusterIntegrationTestCommandLineRunner.java | 10 +-
.../tests/RealtimeClusterIntegrationTest.java | 2 +-
pinot-perf/pom.xml | 3 +-
.../perf/BenchmarkRealtimeConsumptionSpeed.java | 8 +-
.../org/apache/pinot/perf/RealtimeStressTest.java | 8 +-
pinot-tools/pom.xml | 3 +-
.../org/apache/pinot/tools/HybridQuickstart.java | 38 +++--
.../org/apache/pinot/tools/KafkaStarterUtils.java | 103 +++++++++++++
.../org/apache/pinot/tools/RealtimeQuickStart.java | 33 ++--
.../tools/admin/command/StartKafkaCommand.java | 14 +-
.../admin/command/StreamAvroIntoKafkaCommand.java | 22 ++-
.../pinot/tools/streams/AirlineDataStream.java | 19 +--
.../pinot/tools/streams/MeetupRsvpStream.java | 33 ++--
pom.xml | 9 +-
28 files changed, 536 insertions(+), 290 deletions(-)
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index 2e27ac8..41c9b39 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -198,10 +198,6 @@
<artifactId>jopt-simple</artifactId>
</dependency>
<dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </dependency>
- <dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
index e8f6c93..0450102 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
@@ -42,7 +42,7 @@
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${kafka.scala.version}</artifactId>
+ <artifactId>kafka_2.10</artifactId>
<version>${kafka.lib.version}</version>
<exclusions>
<exclusion>
@@ -63,5 +63,10 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.10.5</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
deleted file mode 100644
index 5f1de99..0000000
---
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka;
-
-import java.io.File;
-import java.security.Permission;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import kafka.admin.TopicCommand;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-
-
-/**
- * Utilities to start Kafka during unit tests.
- *
- */
-public class KafkaStarterUtils {
- public static final int DEFAULT_KAFKA_PORT = 19092;
- public static final int DEFAULT_BROKER_ID = 0;
- public static final String DEFAULT_ZK_STR = ZkStarter.DEFAULT_ZK_STR +
"/kafka";
- public static final String DEFAULT_KAFKA_BROKER = "localhost:" +
DEFAULT_KAFKA_PORT;
-
- public static Properties getDefaultKafkaConfiguration() {
- final Properties configuration = new Properties();
-
- // Enable topic deletion by default for integration tests
- configureTopicDeletion(configuration, true);
-
- // Set host name
- configureHostName(configuration, "localhost");
-
- return configuration;
- }
-
- public static List<KafkaServerStartable> startServers(final int brokerCount,
final int port, final String zkStr,
- final Properties configuration) {
- List<KafkaServerStartable> startables = new ArrayList<>(brokerCount);
-
- for (int i = 0; i < brokerCount; i++) {
- startables.add(startServer(port + i, i, zkStr, "/tmp/kafka-" +
Double.toHexString(Math.random()), configuration));
- }
-
- return startables;
- }
-
- public static KafkaServerStartable startServer(final int port, final int
brokerId, final String zkStr,
- final Properties configuration) {
- return startServer(port, brokerId, zkStr, "/tmp/kafka-" +
Double.toHexString(Math.random()), configuration);
- }
-
- public static KafkaServerStartable startServer(final int port, final int
brokerId, final String zkStr,
- final String logDirPath, final Properties configuration) {
- // Create the ZK nodes for Kafka, if needed
- int indexOfFirstSlash = zkStr.indexOf('/');
- if (indexOfFirstSlash != -1) {
- String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
- String zkNodePath = zkStr.substring(indexOfFirstSlash);
- ZkClient client = new ZkClient(bareZkUrl);
- client.createPersistent(zkNodePath, true);
- client.close();
- }
-
- File logDir = new File(logDirPath);
- logDir.mkdirs();
-
- configureKafkaPort(configuration, port);
- configureZkConnectionString(configuration, zkStr);
- configureBrokerId(configuration, brokerId);
- configureKafkaLogDirectory(configuration, logDir);
- configuration.put("zookeeper.session.timeout.ms", "60000");
- KafkaConfig config = new KafkaConfig(configuration);
-
- KafkaServerStartable serverStartable = new KafkaServerStartable(config);
- serverStartable.startup();
-
- return serverStartable;
- }
-
- public static void configureSegmentSizeBytes(Properties properties, int
segmentSize) {
- properties.put("log.segment.bytes", Integer.toString(segmentSize));
- }
-
- public static void configureLogRetentionSizeBytes(Properties properties, int
logRetentionSizeBytes) {
- properties.put("log.retention.bytes",
Integer.toString(logRetentionSizeBytes));
- }
-
- public static void configureKafkaLogDirectory(Properties configuration, File
logDir) {
- configuration.put("log.dirs", logDir.getAbsolutePath());
- }
-
- public static void configureBrokerId(Properties configuration, int brokerId)
{
- configuration.put("broker.id", Integer.toString(brokerId));
- }
-
- public static void configureZkConnectionString(Properties configuration,
String zkStr) {
- configuration.put("zookeeper.connect", zkStr);
- }
-
- public static void configureKafkaPort(Properties configuration, int port) {
- configuration.put("port", Integer.toString(port));
- }
-
- public static void configureTopicDeletion(Properties configuration, boolean
topicDeletionEnabled) {
- configuration.put("delete.topic.enable",
Boolean.toString(topicDeletionEnabled));
- }
-
- public static void configureHostName(Properties configuration, String
hostName) {
- configuration.put("host.name", hostName);
- }
-
- public static void stopServer(KafkaServerStartable serverStartable) {
- serverStartable.shutdown();
- FileUtils.deleteQuietly(new
File(serverStartable.serverConfig().logDirs().apply(0)));
- }
-
- public static void createTopic(String kafkaTopic, String zkStr, int
partitionCount) {
- invokeTopicCommand(
- new String[]{"--create", "--zookeeper", zkStr, "--replication-factor",
"1", "--partitions", Integer.toString(
- partitionCount), "--topic", kafkaTopic});
- }
-
- private static void invokeTopicCommand(String[] args) {
- // jfim: Use Java security to trap System.exit in Kafka 0.9's TopicCommand
- System.setSecurityManager(new SecurityManager() {
- @Override
- public void checkPermission(Permission perm) {
- if (perm.getName().startsWith("exitVM")) {
- throw new SecurityException("System.exit is disabled");
- }
- }
-
- @Override
- public void checkPermission(Permission perm, Object context) {
- checkPermission(perm);
- }
- });
-
- try {
- TopicCommand.main(args);
- } catch (SecurityException ex) {
- // Do nothing, this is caused by our security manager that disables
System.exit
- }
-
- System.setSecurityManager(null);
- }
-
- public static void deleteTopic(String kafkaTopic, String zkStr) {
- invokeTopicCommand(new String[]{"--delete", "--zookeeper", zkStr,
"--topic", kafkaTopic});
- }
-}
diff --git
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
new file mode 100644
index 0000000..0eb4ac6
--- /dev/null
+++
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka.server;
+
+import java.util.Properties;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+
+
+public class KafkaDataProducer implements StreamDataProducer {
+ Producer<byte[], byte[]> producer;
+
+ @Override
+ public void init(Properties props) {
+ ProducerConfig producerConfig = new ProducerConfig(props);
+ this.producer = new Producer(producerConfig);
+ }
+
+ @Override
+ public void produce(String topic, byte[] payload) {
+ KeyedMessage<byte[], byte[]> data = new KeyedMessage<>(topic, payload);
+ this.produce(data);
+ }
+
+ @Override
+ public void produce(String topic, byte[] key, byte[] payload) {
+ KeyedMessage<byte[], byte[]> data = new KeyedMessage<>(topic, key,
payload);
+ this.produce(data);
+ }
+
+ public void produce(KeyedMessage message) {
+ producer.send(message);
+ }
+
+ @Override
+ public void close() {
+ producer.close();
+ }
+}
diff --git
a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
new file mode 100644
index 0000000..1c2a8ff
--- /dev/null
+++
b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka.server;
+
+import java.io.File;
+import java.security.Permission;
+import java.util.Properties;
+import kafka.admin.TopicCommand;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaDataServerStartable implements StreamDataServerStartable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaDataServerStartable.class);
+
+ private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
+ private static final String LOG_DIRS = "log.dirs";
+
+ private KafkaServerStartable serverStartable;
+ private String zkStr;
+ private String logDirPath;
+
+ private static void invokeTopicCommand(String[] args) {
+ // jfim: Use Java security to trap System.exit in Kafka 0.9's TopicCommand
+ System.setSecurityManager(new SecurityManager() {
+ @Override
+ public void checkPermission(Permission perm) {
+ if (perm.getName().startsWith("exitVM")) {
+ throw new SecurityException("System.exit is disabled");
+ }
+ }
+
+ @Override
+ public void checkPermission(Permission perm, Object context) {
+ checkPermission(perm);
+ }
+ });
+
+ try {
+ TopicCommand.main(args);
+ } catch (SecurityException ex) {
+ // Do nothing, this is caused by our security manager that disables
System.exit
+ }
+
+ System.setSecurityManager(null);
+ }
+
+ public void init(Properties props) {
+ zkStr = props.getProperty(ZOOKEEPER_CONNECT);
+ logDirPath = props.getProperty(LOG_DIRS);
+
+ // Create the ZK nodes for Kafka, if needed
+ int indexOfFirstSlash = zkStr.indexOf('/');
+ if (indexOfFirstSlash != -1) {
+ String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
+ String zkNodePath = zkStr.substring(indexOfFirstSlash);
+ ZkClient client = new ZkClient(bareZkUrl);
+ client.createPersistent(zkNodePath, true);
+ client.close();
+ }
+
+ File logDir = new File(logDirPath);
+ logDir.mkdirs();
+
+ props.put("zookeeper.session.timeout.ms", "60000");
+ KafkaConfig config = new KafkaConfig(props);
+
+ serverStartable = new KafkaServerStartable(config);
+ }
+
+ @Override
+ public void start() {
+ serverStartable.startup();
+ }
+
+ @Override
+ public void stop() {
+ serverStartable.shutdown();
+ FileUtils.deleteQuietly(new
File(serverStartable.serverConfig().logDirs().apply(0)));
+ }
+
+ @Override
+ public void createTopic(String topic, Properties props) {
+ invokeTopicCommand(
+ new String[]{"--create", "--zookeeper", this.zkStr,
"--replication-factor", "1", "--partitions", Integer.toString(
+ (Integer) props.get("partition")), "--topic", topic});
+ }
+}
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index 51c9231..d074639 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -161,10 +161,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </dependency>
- <dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</dependency>
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
new file mode 100644
index 0000000..53275a0
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+/**
+ * StreamDataServerStartable is the interface for stream data sources. E.g.
KafkaServerStartable, KinesisServerStarable.
+ */
+public interface StreamDataProducer {
+ void init(Properties props);
+
+ void produce(String topic, byte[] payload);
+
+ void produce(String topic, byte[] key, byte[] payload);
+
+ void close();
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
new file mode 100644
index 0000000..05e0827
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+/**
+ * StreamDataProvider provides StreamDataServerStartable and
StreamDataProducer based on
+ * given implementation class name.
+ * E.g. KafkaDataServerStartable, KafkaDataProducer.
+ *
+ */
+public class StreamDataProvider {
+ public static StreamDataServerStartable getServerDataStartable(String clazz,
Properties props)
+ throws Exception {
+ final StreamDataServerStartable streamDataServerStartable =
+ (StreamDataServerStartable) Class.forName(clazz).newInstance();
+ streamDataServerStartable.init(props);
+ return streamDataServerStartable;
+ }
+
+ public static StreamDataProducer getStreamDataProducer(String clazz,
Properties props)
+ throws Exception {
+
+ final StreamDataProducer streamDataProducer = (StreamDataProducer)
Class.forName(clazz).newInstance();
+ streamDataProducer.init(props);
+ return streamDataProducer;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
new file mode 100644
index 0000000..34a1a38
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+/**
+ * StreamDataServerStartable is the interface for stream data sources.
+ * Each stream data connector should implement a mock/wrapper of the data
server.
+ *
+ * E.g. KafkaDataServerStartable is a wrapper class of Kafka 0.9 broker.
+ *
+ */
+public interface StreamDataServerStartable {
+ /**
+ * Init the server.
+ *
+ * @param props
+ */
+ void init(Properties props);
+
+ /**
+ * Start the server
+ */
+ void start();
+
+ /**
+ * Stop the server
+ */
+ void stop();
+
+ /**
+ * Create a data stream (e.g Kafka topic) in the server.
+ *
+ * @param topic
+ * @param topicProps
+ */
+ void createTopic(String topic, Properties topicProps);
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
index b6b41ff..dca78da 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
@@ -26,7 +26,6 @@ import org.joda.time.MutableDateTime;
import org.joda.time.format.DateTimeFormat;
import org.testng.Assert;
import org.testng.annotations.Test;
-import scala.collection.mutable.StringBuilder;
public class FunctionExpressionEvaluatorTest {
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index b58647f..8277e84 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -191,8 +191,9 @@
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
- <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+ <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
<version>${project.version}</version>
+ <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 67fabc6..d94b7b6 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.common.config.ColumnPartitionConfig;
@@ -40,7 +39,8 @@ import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+import org.apache.pinot.tools.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
@@ -73,12 +73,13 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected final File _avroDir = new File(_tempDir, "avroDir");
protected final File _segmentDir = new File(_tempDir, "segmentDir");
protected final File _tarDir = new File(_tempDir, "tarDir");
- protected List<KafkaServerStartable> _kafkaStarters;
+ protected List<StreamDataServerStartable> _kafkaStarters;
private org.apache.pinot.client.Connection _pinotConnection;
private Connection _h2Connection;
private QueryGenerator _queryGenerator;
+
/**
* The following getters can be overridden to change default settings.
*/
@@ -318,8 +319,9 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
@Override
public void run() {
try {
- ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles,
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn());
+ ClusterIntegrationTestUtils
+ .pushAvroIntoKafka(avroFiles,
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
+ getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn());
} catch (Exception e) {
// Ignored
}
@@ -328,15 +330,17 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
}
protected void startKafka() {
- _kafkaStarters = KafkaStarterUtils
- .startServers(getNumKafkaBrokers(),
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
+
+ _kafkaStarters =
+ KafkaStarterUtils.startServers(getNumKafkaBrokers(),
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
KafkaStarterUtils.getDefaultKafkaConfiguration());
- KafkaStarterUtils.createTopic(getKafkaTopic(),
KafkaStarterUtils.DEFAULT_ZK_STR, getNumKafkaPartitions());
+ _kafkaStarters.get(0)
+ .createTopic(getKafkaTopic(),
KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
}
protected void stopKafka() {
- for (KafkaServerStartable kafkaStarter : _kafkaStarters) {
- KafkaStarterUtils.stopServer(kafkaStarter);
+ for (StreamDataServerStartable kafkaStarter : _kafkaStarters) {
+ kafkaStarter.stop();
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index fc14789..374740c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -51,7 +51,7 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.tools.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.ITestContext;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index 0983d4e..d1fd9ff 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -33,7 +33,7 @@ import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.tools.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index be24be8..ec15591 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -34,14 +34,14 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
-import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.broker.requesthandler.PinotQueryRequest;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
import org.apache.pinot.tools.query.comparison.QueryComparison;
+import org.apache.pinot.tools.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.ITestResult;
@@ -188,7 +188,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
private List<File> _realtimeAvroFiles;
private File _queryFile;
private File _responseFile;
- private KafkaServerStartable _kafkaStarter;
+ private StreamDataServerStartable _kafkaStarter;
private long _countStarResult;
public CustomHybridClusterIntegrationTest() {
@@ -262,7 +262,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
KafkaStarterUtils.getDefaultKafkaConfiguration());
// Create Kafka topic
- KafkaStarterUtils.createTopic(getKafkaTopic(), KAFKA_ZK_STR,
getNumKafkaPartitions());
+ _kafkaStarter.createTopic(getKafkaTopic(),
KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
// Start the Pinot cluster
ControllerConf config = getDefaultControllerConfiguration();
@@ -379,7 +379,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
stopServer();
stopBroker();
stopController();
- KafkaStarterUtils.stopServer(_kafkaStarter);
+ _kafkaStarter.stop();
stopZk();
FileUtils.deleteDirectory(_tempDir);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 8b4baeb..6dc4fc6 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -26,7 +26,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.data.Schema;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.tools.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 74bb09f..3c0165e 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -52,8 +52,9 @@
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
- <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+ <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
<version>${project.version}</version>
+ <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
index 8bbb4d5..e623ece 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import kafka.server.KafkaServerStartable;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.tools.KafkaStarterUtils;
import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
import org.apache.pinot.util.TestUtils;
@@ -58,12 +58,12 @@ public class BenchmarkRealtimeConsumptionSpeed extends
RealtimeClusterIntegratio
throws Exception {
// Start ZK and Kafka
startZk();
- KafkaServerStartable kafkaStarter = KafkaStarterUtils
+ StreamDataServerStartable kafkaStarter = KafkaStarterUtils
.startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT,
KafkaStarterUtils.DEFAULT_BROKER_ID,
KafkaStarterUtils.DEFAULT_ZK_STR,
KafkaStarterUtils.getDefaultKafkaConfiguration());
// Create Kafka topic
- KafkaStarterUtils.createTopic(getKafkaTopic(),
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+ kafkaStarter.createTopic(getKafkaTopic(),
KafkaStarterUtils.getTopicCreationProps(10));
// Unpack data (needed to get the Avro schema)
TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
index 71d28e7..b945fc5 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import kafka.server.KafkaServerStartable;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.tools.KafkaStarterUtils;
import org.apache.pinot.integration.tests.OfflineClusterIntegrationTest;
import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
import org.apache.pinot.util.TestUtils;
@@ -59,12 +59,12 @@ public class RealtimeStressTest extends
RealtimeClusterIntegrationTest {
throws Exception {
// Start ZK and Kafka
startZk();
- KafkaServerStartable kafkaStarter = KafkaStarterUtils
+ StreamDataServerStartable kafkaStarter = KafkaStarterUtils
.startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT,
KafkaStarterUtils.DEFAULT_BROKER_ID,
KafkaStarterUtils.DEFAULT_ZK_STR,
KafkaStarterUtils.getDefaultKafkaConfiguration());
// Create Kafka topic
- KafkaStarterUtils.createTopic(getKafkaTopic(),
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+ kafkaStarter.createTopic(getKafkaTopic(),
KafkaStarterUtils.getTopicCreationProps(10));
// Unpack data (needed to get the Avro schema)
TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 6e18295..cf9f507 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -56,8 +56,9 @@
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
- <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+ <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
<version>${project.version}</version>
+ <scope>runtime</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index fe004d3..08d9b2a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -23,12 +23,13 @@ import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.net.URL;
-import kafka.server.KafkaServerStartable;
+import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.core.data.readers.FileFormat;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.streams.AirlineDataStream;
@@ -37,16 +38,21 @@ import static org.apache.pinot.tools.Quickstart.printStatus;
public class HybridQuickstart {
- private HybridQuickstart() {
- }
-
private File _offlineQuickStartDataDir;
private File _realtimeQuickStartDataDir;
- private KafkaServerStartable _kafkaStarter;
+ private StreamDataServerStartable _kafkaStarter;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
private File _schemaFile;
private File _dataFile;
+ private HybridQuickstart() {
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ new HybridQuickstart().execute();
+ }
+
private QuickstartTableRequest prepareOfflineTableRequest()
throws IOException {
_offlineQuickStartDataDir = new File("quickStartData" +
System.currentTimeMillis());
@@ -94,11 +100,14 @@ public class HybridQuickstart {
private void startKafka() {
_zookeeperInstance = ZkStarter.startLocalZkServer();
- _kafkaStarter = KafkaStarterUtils
- .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT,
KafkaStarterUtils.DEFAULT_BROKER_ID,
- KafkaStarterUtils.DEFAULT_ZK_STR,
KafkaStarterUtils.getDefaultKafkaConfiguration());
-
- KafkaStarterUtils.createTopic("airlineStatsEvents",
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+ String kafkaClazz =
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+ try {
+ _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz,
KafkaStarterUtils.getDefaultKafkaConfiguration());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start " + kafkaClazz, e);
+ }
+ _kafkaStarter.start();
+ _kafkaStarter.createTopic("airlineStatsEvents",
KafkaStarterUtils.getTopicCreationProps(10));
}
public void execute()
@@ -153,7 +162,7 @@ public class HybridQuickstart {
stream.shutdown();
Thread.sleep(2000);
runner.stop();
- KafkaStarterUtils.stopServer(_kafkaStarter);
+ _kafkaStarter.stop();
ZkStarter.stopLocalZkServer(_zookeeperInstance);
FileUtils.deleteDirectory(_offlineQuickStartDataDir);
FileUtils.deleteDirectory(_realtimeQuickStartDataDir);
@@ -163,9 +172,4 @@ public class HybridQuickstart {
}
});
}
-
- public static void main(String[] args)
- throws Exception {
- new HybridQuickstart().execute();
- }
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/KafkaStarterUtils.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/KafkaStarterUtils.java
new file mode 100644
index 0000000..73da8dc
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/KafkaStarterUtils.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+
+
+public class KafkaStarterUtils {
+ public static final int DEFAULT_BROKER_ID = 0;
+ public static final String DEFAULT_ZK_STR = ZkStarter.DEFAULT_ZK_STR +
"/kafka";
+ public static int DEFAULT_KAFKA_PORT = 19092;
+ public static final String DEFAULT_KAFKA_BROKER = "localhost:" +
DEFAULT_KAFKA_PORT;
+
+ public static final String PORT = "port";
+ public static final String BROKER_ID = "broker.id";
+ private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
+ private static final String LOG_DIRS = "log.dirs";
+
+ public static final String KAFKA_SERVER_STARTABLE_CLASS_NAME =
+
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+ public static final String KAFKA_PRODUCER_CLASS_NAME =
+ "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataProducer";
+ public static final String KAFKA_JSON_MESSAGE_DECODER_CLASS_NAME =
+ "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder";
+
+ public static Properties getDefaultKafkaConfiguration() {
+ final Properties configuration = new Properties();
+
+ // Enable topic deletion by default for integration tests
+ configureTopicDeletion(configuration, true);
+
+ // Set host name
+ configureHostName(configuration, "localhost");
+
+ configuration.put(PORT, DEFAULT_KAFKA_PORT);
+ configuration.put(BROKER_ID, DEFAULT_BROKER_ID);
+ configuration.put(ZOOKEEPER_CONNECT, DEFAULT_ZK_STR);
+ configuration.put(LOG_DIRS, "/tmp/kafka-" +
Double.toHexString(Math.random()));
+
+ return configuration;
+ }
+
+ public static void configureTopicDeletion(Properties configuration, boolean
topicDeletionEnabled) {
+ configuration.put("delete.topic.enable",
Boolean.toString(topicDeletionEnabled));
+ }
+
+ public static void configureHostName(Properties configuration, String
hostName) {
+ configuration.put("host.name", hostName);
+ }
+
+ public static Properties getTopicCreationProps(int numKafkaPartitions) {
+ Properties topicProps = new Properties();
+ topicProps.put("partition", numKafkaPartitions);
+ return topicProps;
+ }
+
+ public static List<StreamDataServerStartable> startServers(final int
brokerCount, final int port, final String zkStr,
+ final Properties configuration) {
+ List<StreamDataServerStartable> startables = new ArrayList<>(brokerCount);
+
+ for (int i = 0; i < brokerCount; i++) {
+ startables.add(startServer(port + i, i, zkStr, configuration));
+ }
+ return startables;
+ }
+
+ public static StreamDataServerStartable startServer(final int port, final
int brokerId, final String zkStr,
+ final Properties configuration) {
+ StreamDataServerStartable kafkaStarter;
+ try {
+ configuration.put(KafkaStarterUtils.PORT, port);
+ configuration.put(KafkaStarterUtils.BROKER_ID, brokerId);
+ configuration.put(KafkaStarterUtils.ZOOKEEPER_CONNECT, zkStr);
+ configuration.put(KafkaStarterUtils.LOG_DIRS, "/tmp/kafka-" +
Double.toHexString(Math.random()));
+ kafkaStarter =
StreamDataProvider.getServerDataStartable(KAFKA_SERVER_STARTABLE_CLASS_NAME,
configuration);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start " +
KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
+ }
+ kafkaStarter.start();
+ return kafkaStarter;
+ }
+}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 8df1fbb..b970648 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -22,10 +22,11 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
-import kafka.server.KafkaServerStartable;
+import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.streams.MeetupRsvpStream;
@@ -35,9 +36,16 @@ import static org.apache.pinot.tools.Quickstart.printStatus;
public class RealtimeQuickStart {
+ private StreamDataServerStartable _kafkaStarter;
+
private RealtimeQuickStart() {
}
+ public static void main(String[] args)
+ throws Exception {
+ new RealtimeQuickStart().execute();
+ }
+
public void execute()
throws Exception {
final File quickStartDataDir = new File("quickStartData" +
System.currentTimeMillis());
@@ -64,10 +72,16 @@ public class RealtimeQuickStart {
printStatus(Color.CYAN, "***** Starting Kafka *****");
final ZkStarter.ZookeeperInstance zookeeperInstance =
ZkStarter.startLocalZkServer();
- final KafkaServerStartable kafkaStarter = KafkaStarterUtils
- .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT,
KafkaStarterUtils.DEFAULT_BROKER_ID,
- KafkaStarterUtils.DEFAULT_ZK_STR,
KafkaStarterUtils.getDefaultKafkaConfiguration());
- KafkaStarterUtils.createTopic("meetupRSVPEvents",
KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+
+ String kafkaClazz =
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+ try {
+ _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz,
KafkaStarterUtils.getDefaultKafkaConfiguration());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start " + kafkaClazz, e);
+ }
+ _kafkaStarter.start();
+ _kafkaStarter.createTopic("meetupRSVPEvents",
KafkaStarterUtils.getTopicCreationProps(10));
+
printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and
broker *****");
runner.startAll();
printStatus(Color.CYAN, "***** Adding meetupRSVP schema *****");
@@ -87,7 +101,7 @@ public class RealtimeQuickStart {
printStatus(Color.GREEN, "***** Shutting down realtime quick start
*****");
meetupRSVPProvider.stopPublishing();
runner.stop();
- KafkaStarterUtils.stopServer(kafkaStarter);
+ _kafkaStarter.stop();
ZkStarter.stopLocalZkServer(zookeeperInstance);
FileUtils.deleteDirectory(quickStartDataDir);
} catch (Exception e) {
@@ -130,9 +144,4 @@ public class RealtimeQuickStart {
printStatus(Color.GREEN, "You can always go to
http://localhost:9000/query/ to play around in the query console");
}
-
- public static void main(String[] args)
- throws Exception {
- new RealtimeQuickStart().execute();
- }
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
index 38f042c..53a1d1c 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
@@ -20,8 +20,10 @@ package org.apache.pinot.tools.admin.command;
import java.io.File;
import java.io.IOException;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Command;
+import org.apache.pinot.tools.KafkaStarterUtils;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +34,7 @@ import org.slf4j.LoggerFactory;
*/
public class StartKafkaCommand extends AbstractBaseAdminCommand implements
Command {
private static final Logger LOGGER =
LoggerFactory.getLogger(StartKafkaCommand.class);
+
@Option(name = "-port", required = false, metaVar = "<int>", usage = "Port
to start Kafka server on.")
private int _port = KafkaStarterUtils.DEFAULT_KAFKA_PORT;
@@ -43,6 +46,7 @@ public class StartKafkaCommand extends
AbstractBaseAdminCommand implements Comma
@Option(name = "-zkAddress", required = false, metaVar = "<string>", usage =
"Address of Zookeeper.")
private String _zkAddress = "localhost:2181";
+ private StreamDataServerStartable _kafkaStarter;
@Override
public boolean getHelp() {
@@ -67,7 +71,13 @@ public class StartKafkaCommand extends
AbstractBaseAdminCommand implements Comma
@Override
public boolean execute()
throws IOException {
- KafkaStarterUtils.startServer(_port, _brokerId, _zkAddress,
KafkaStarterUtils.getDefaultKafkaConfiguration());
+ String kafkaClazz =
"org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+ try {
+ _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz,
KafkaStarterUtils.getDefaultKafkaConfiguration());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start " + kafkaClazz, e);
+ }
+ _kafkaStarter.start();
LOGGER.info("Start kafka at localhost:" + _port + " in thread " +
Thread.currentThread().getName());
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 0a75023..e9639a8 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -24,15 +24,14 @@ import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.common.utils.HashUtil;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
import org.apache.pinot.core.util.AvroUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
import org.apache.pinot.tools.Command;
+import org.apache.pinot.tools.KafkaStarterUtils;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +42,6 @@ import org.slf4j.LoggerFactory;
*/
public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand
implements Command {
private static final Logger LOGGER =
LoggerFactory.getLogger(StreamAvroIntoKafkaCommand.class);
-
@Option(name = "-avroFile", required = true, metaVar = "<String>", usage =
"Avro file to stream.")
private String _avroFile = null;
@@ -104,8 +102,12 @@ public class StreamAvroIntoKafkaCommand extends
AbstractBaseAdminCommand impleme
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
- ProducerConfig producerConfig = new ProducerConfig(properties);
- Producer<byte[], byte[]> producer = new Producer<byte[],
byte[]>(producerConfig);
+ StreamDataProducer streamDataProducer;
+ try {
+ streamDataProducer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
properties);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to get StreamDataProducer - " +
KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, e);
+ }
try {
// Open the Avro file
DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(new
File(_avroFile));
@@ -115,11 +117,7 @@ public class StreamAvroIntoKafkaCommand extends
AbstractBaseAdminCommand impleme
// Write the message to Kafka
String recordJson = genericRecord.toString();
byte[] bytes = recordJson.getBytes("utf-8");
- KeyedMessage<byte[], byte[]> data =
- new KeyedMessage<byte[], byte[]>(_kafkaTopic,
Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)),
- bytes);
-
- producer.send(data);
+ streamDataProducer.produce(_kafkaTopic,
Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes);
// Sleep between messages
if (sleepRequired) {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index 7c238bf..1d6a4b1 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -27,9 +27,6 @@ import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@@ -37,7 +34,9 @@ import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.data.TimeFieldSpec;
import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.tools.KafkaStarterUtils;
import org.apache.pinot.tools.Quickstart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,12 +50,12 @@ public class AirlineDataStream {
DataFileStream<GenericRecord> avroDataStream;
Integer currentTimeValue = 16102;
boolean keepIndexing = true;
- private Producer<String, byte[]> producer;
ExecutorService service;
int counter = 0;
+ private StreamDataProducer producer;
public AirlineDataStream(Schema pinotSchema, File avroFile)
- throws FileNotFoundException, IOException {
+ throws Exception {
this.pinotSchema = pinotSchema;
this.avroFile = avroFile;
createStream();
@@ -65,8 +64,8 @@ public class AirlineDataStream {
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
- ProducerConfig producerConfig = new ProducerConfig(properties);
- producer = new Producer<String, byte[]>(producerConfig);
+ producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
properties);
+
service = Executors.newFixedThreadPool(1);
Quickstart.printStatus(Quickstart.Color.YELLOW,
"***** Offine data has max time as 16101, realtime will start
consuming from time 16102 and increment time every 3000 events *****");
@@ -97,9 +96,7 @@ public class AirlineDataStream {
avroDataStream = null;
return;
}
- KeyedMessage<String, byte[]> data =
- new KeyedMessage<String, byte[]>("airlineStatsEvents",
message.toString().getBytes("UTF-8"));
- producer.send(data);
+ producer.produce("airlineStatsEvents",
message.toString().getBytes("UTF-8"));
}
public void run() {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index 114072f..d0d9ed1 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -23,40 +23,39 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
import java.util.Properties;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.tools.KafkaStarterUtils;
import org.glassfish.tyrus.client.ClientManager;
public class MeetupRsvpStream {
+
+ private static final String DEFAULT_KAFKA_BROKER = "localhost:19092";
+
private Schema schema;
- private Producer<String, byte[]> producer;
+ private StreamDataProducer producer;
private boolean keepPublishing = true;
private ClientManager client;
public MeetupRsvpStream(File schemaFile)
- throws IOException, URISyntaxException {
+ throws Exception {
schema = Schema.fromFile(schemaFile);
-
Properties properties = new Properties();
- properties.put("metadata.broker.list",
KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+ properties.put("metadata.broker.list", DEFAULT_KAFKA_BROKER);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
-
- ProducerConfig producerConfig = new ProducerConfig(properties);
- producer = new Producer<String, byte[]>(producerConfig);
+ producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
properties);
}
public void stopPublishing() {
@@ -66,9 +65,9 @@ public class MeetupRsvpStream {
public void run() {
try {
-
final ClientEndpointConfig cec =
ClientEndpointConfig.Builder.create().build();
- final KafkaJSONMessageDecoder decoder = new KafkaJSONMessageDecoder();
+ final StreamMessageDecoder decoder =
+ (StreamMessageDecoder)
Class.forName(KafkaStarterUtils.KAFKA_JSON_MESSAGE_DECODER_CLASS_NAME).newInstance();
decoder.init(null, schema, null);
client = ClientManager.createClient();
client.connectToServer(new Endpoint() {
@@ -108,9 +107,7 @@ public class MeetupRsvpStream {
extracted.put("rsvp_count", 1);
if (keepPublishing) {
- KeyedMessage<String, byte[]> data =
- new KeyedMessage<String, byte[]>("meetupRSVPEvents",
extracted.toString().getBytes("UTF-8"));
- producer.send(data);
+ producer.produce("meetupRSVPEvents",
extracted.toString().getBytes(StandardCharsets.UTF_8));
}
} catch (Exception e) {
//LOGGER.error("error processing raw event ", e);
diff --git a/pom.xml b/pom.xml
index a734f94..0d00729 100644
--- a/pom.xml
+++ b/pom.xml
@@ -141,8 +141,7 @@
kafka dependency is still explicitly defined in pinot-integration-tests,
pinot-tools and pinot-perf pom files.
To change kafka connector dependency, we only need to update this version
number config.
TODO: figure out a way to inject kafka dependency instead of explicitly
setting the kafka module dependency -->
- <kafka.version>0.9</kafka.version>
- <kafka.scala.version>2.10</kafka.scala.version>
+ <kafka.lib.version>0.9</kafka.lib.version>
</properties>
<profiles>
@@ -929,12 +928,6 @@
<artifactId>jopt-simple</artifactId>
<version>4.6</version>
</dependency>
- <!-- kafka_2.10 & larray use scala-library -->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.10.5</version>
- </dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]