This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch kafka_2.0 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit d9e031618c4c7fa28e64d62858aa7e4a36d6f279 Author: Xiang Fu <[email protected]> AuthorDate: Mon Jul 1 16:17:25 2019 -0700 Adding support for Kafka 2.0 --- pinot-connectors/pinot-connector-kafka-0.9/pom.xml | 5 + pinot-connectors/pinot-connector-kafka-2.0/pom.xml | 112 +++++--- ...umerFactory.java => Kafka2ConsumerFactory.java} | 36 +-- .../impl/kafka2/Kafka2ConsumerManager.java | 191 ++++++++++++++ .../impl/kafka2/Kafka2HighLevelStreamConfig.java | 135 ++++++++++ .../realtime/impl/kafka2/Kafka2MessageBatch.java | 61 +++++ .../Kafka2PartitionLevelConnectionHandler.java | 67 +++++ ...Kafka2PartitionLevelPartitionLevelConsumer.java | 65 +++++ .../kafka2/Kafka2PartitionLevelStreamConfig.java | 146 +++++++++++ ...Kafka2PartitionLevelStreamMetadataProvider.java | 67 +++++ ...ties.java => Kafka2StreamConfigProperties.java} | 32 +-- .../impl/kafka2/Kafka2StreamLevelConsumer.java | 166 ++++++++++++ .../impl/kafka2/KafkaAvroMessageDecoder.java | 290 +++++++++++++++++++++ .../impl/kafka2/KafkaConnectionHandler.java | 61 ----- .../impl/kafka2/KafkaJSONMessageDecoder.java | 63 +++++ .../realtime/impl/kafka2/KafkaMessageBatch.java | 65 ----- .../impl/kafka2/KafkaPartitionConsumer.java | 51 ---- .../kafka2/KafkaPartitionLevelStreamConfig.java | 144 ---------- .../impl/kafka2/KafkaStreamMetadataProvider.java | 81 ------ .../realtime/impl/kafka2/MessageAndOffset.java | 42 +-- .../kafka2/KafkaPartitionLevelConsumerTest.java | 232 +++++++++++++++++ .../KafkaPartitionLevelStreamConfigTest.java | 161 ++++++++++++ .../impl/kafka2/utils/EmbeddedZooKeeper.java | 60 +++++ .../impl/kafka2/utils/MiniKafkaCluster.java | 175 +++++++++++++ pinot-connectors/pom.xml | 12 + 25 files changed, 2024 insertions(+), 496 deletions(-) diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml index ae0317e..852c29c 100644 --- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml +++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml @@ -63,5 +63,10 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>2.10.5</version> + </dependency> </dependencies> </project> diff --git a/pinot-connectors/pinot-connector-kafka-2.0/pom.xml b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml index f351219..2a9c155 100644 --- a/pinot-connectors/pinot-connector-kafka-2.0/pom.xml +++ b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml @@ -22,46 +22,82 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>pinot-connectors</artifactId> - <groupId>org.apache.pinot</groupId> - <version>0.2.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>pinot-connectors</artifactId> + <groupId>org.apache.pinot</groupId> + <version>0.2.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>pinot-connector-kafka-2.0</artifactId> + <name>Pinot Connector Kafka 2.0</name> + <url>https://pinot.apache.org/</url> + <properties> + <pinot.root>${basedir}/../..</pinot.root> + <kafka.version>2.0.0</kafka.version> + </properties> - <artifactId>pinot-connector-kafka-2.0</artifactId> + <dependencies> - <properties> - <pinot.root>${basedir}/../..</pinot.root> - <kafka.version>2.0.0</kafka.version> - </properties> + <!-- Kafka --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> - <dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.12</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency> - <!-- Kafka --> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>${kafka.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>net.sf.jopt-simple</groupId> - <artifactId>jopt-simple</artifactId> - </exclusion> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>2.12.8</version> + <scope>test</scope> + </dependency> + </dependencies> </project> \ No newline at end of file diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java similarity index 57% rename from pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java rename to pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java index cc3d8a6..3eab517 100644 --- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java @@ -26,24 +26,26 @@ import org.apache.pinot.core.realtime.stream.StreamConsumerFactory; import org.apache.pinot.core.realtime.stream.StreamLevelConsumer; import org.apache.pinot.core.realtime.stream.StreamMetadataProvider; -public class KafkaConsumerFactory extends StreamConsumerFactory { - @Override - public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) { - return new KafkaPartitionConsumer(_streamConfig, partition); - } - @Override - public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema, InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) { - throw new UnsupportedOperationException("High level consumer not supported in kafka 2. Use Kafka partition level consumers"); - } +public class Kafka2ConsumerFactory extends StreamConsumerFactory { + @Override + public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) { + return new Kafka2PartitionLevelPartitionLevelConsumer(clientId, _streamConfig, partition); + } - @Override - public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) { - return null; - } + @Override + public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema, + InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) { + return new Kafka2StreamLevelConsumer(clientId, tableName, _streamConfig, schema, instanceZKMetadata, serverMetrics); + } - @Override - public StreamMetadataProvider createStreamMetadataProvider(String clientId) { - throw new UnsupportedOperationException("High level consumer not supported in kafka 2. Use Kafka partition level consumers"); - } + @Override + public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) { + return new Kafka2PartitionLevelStreamMetadataProvider(clientId, _streamConfig, partition); + } + + @Override + public StreamMetadataProvider createStreamMetadataProvider(String clientId) { + return new Kafka2PartitionLevelStreamMetadataProvider(clientId, _streamConfig); + } } diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java new file mode 100644 index 0000000..74e3ee2 --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Manager for Kafka consumers that reuses consumers and delays their shutdown. + * + * This is a workaround for the current realtime design flaw where any issue while flushing/committing offsets causes + * duplicate or dropped events. Kafka consumption is driven by the controller, which assigns a realtime segment to the + * servers; when a server is assigned a new realtime segment, it creates a Kafka consumer, consumes until it reaches a + * threshold then flushes to disk, writes metadata to helix indicating the segment is completed, commits Kafka offsets + * to ZK and then shuts down the consumer. The controller notices the metadata write and reassigns a segment to the + * server, so that it can keep on consuming. + * + * This logic is flawed if committing Kafka offsets fails, at which time the committed state is unknown. The proper fix + * would be to just keep on using that consumer and try committing our offsets later, but we recreate a new Kafka + * consumer whenever we get a new segment and also keep the old consumer around, leading to half the events being + * assigned, due to Kafka rebalancing the partitions between the two consumers (one of which is not actually reading + * anything anymore). Because that logic is stateless and driven by Helix, there's no real clean way to keep the + * consumer alive and pass it to the next segment. + * + * This class and long comment is to work around this issue by keeping the consumer alive for a little bit instead of + * shutting it down immediately, so that the next segment assignment can pick up the same consumer. This way, even if + * committing the offsets fails, we can still pick up the same consumer the next time we get a segment assigned to us + * by the controller and hopefully commit our offsets the next time we flush to disk. + * + * This temporary code should be completely removed by the time we redesign the consumption to use the lower level + * Kafka APIs. + */ +public class Kafka2ConsumerManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(Kafka2ConsumerManager.class); + private static final Long IN_USE = -1L; + private static final long CONSUMER_SHUTDOWN_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(60); // One minute + private static final Map<ImmutableTriple<String, String, String>, KafkaConsumer> CONSUMER_FOR_CONFIG_KEY = + new HashMap<>(); + private static final IdentityHashMap<KafkaConsumer, Long> CONSUMER_RELEASE_TIME = new IdentityHashMap<>(); + + public static KafkaConsumer acquireKafkaConsumerForConfig(Kafka2HighLevelStreamConfig kafka2HighLevelStreamConfig) { + final ImmutableTriple<String, String, String> configKey = + new ImmutableTriple<>(kafka2HighLevelStreamConfig.getKafkaTopicName(), kafka2HighLevelStreamConfig.getGroupId(), + kafka2HighLevelStreamConfig.getBootstrapServers()); + + synchronized (Kafka2ConsumerManager.class) { + // If we have the consumer and it's not already acquired, return it, otherwise error out if it's already acquired + if (CONSUMER_FOR_CONFIG_KEY.containsKey(configKey)) { + KafkaConsumer kafkaConsumer = CONSUMER_FOR_CONFIG_KEY.get(configKey); + if (CONSUMER_RELEASE_TIME.get(kafkaConsumer).equals(IN_USE)) { + throw new RuntimeException("Consumer/iterator " + kafkaConsumer + " already in use!"); + } else { + LOGGER.info("Reusing kafka consumer/iterator with id {}", kafkaConsumer); + CONSUMER_RELEASE_TIME.put(kafkaConsumer, IN_USE); + return kafkaConsumer; + } + } + + LOGGER.info("Creating new kafka consumer and iterator for topic {}", + kafka2HighLevelStreamConfig.getKafkaTopicName()); + + // Create the consumer + + Properties consumerProp = new Properties(); + consumerProp.putAll(kafka2HighLevelStreamConfig.getKafkaConsumerProperties()); + consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka2HighLevelStreamConfig.getBootstrapServers()); + consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProp); + consumer.subscribe(Collections.singletonList(kafka2HighLevelStreamConfig.getKafkaTopicName())); + + // Mark both the consumer and iterator as acquired + CONSUMER_FOR_CONFIG_KEY.put(configKey, consumer); + CONSUMER_RELEASE_TIME.put(consumer, IN_USE); + + LOGGER.info("Created consumer/iterator with id {} for topic {}", consumer, + kafka2HighLevelStreamConfig.getKafkaTopicName()); + + return consumer; + } + } + + public static void releaseKafkaConsumer(final KafkaConsumer kafkaConsumer) { + synchronized (Kafka2ConsumerManager.class) { + // Release the consumer, mark it for shutdown in the future + final long releaseTime = System.currentTimeMillis() + CONSUMER_SHUTDOWN_DELAY_MILLIS; + CONSUMER_RELEASE_TIME.put(kafkaConsumer, releaseTime); + + LOGGER.info("Marking consumer/iterator with id {} for release at {}", kafkaConsumer, releaseTime); + + // Schedule the shutdown of the consumer + new Thread() { + @Override + public void run() { + try { + // Await the shutdown time + Uninterruptibles.sleepUninterruptibly(CONSUMER_SHUTDOWN_DELAY_MILLIS, TimeUnit.MILLISECONDS); + + // Shutdown all consumers that have not been re-acquired + synchronized (Kafka2ConsumerManager.class) { + LOGGER.info("Executing release check for consumer/iterator {} at {}, scheduled at ", kafkaConsumer, + System.currentTimeMillis(), releaseTime); + + Iterator<Map.Entry<ImmutableTriple<String, String, String>, KafkaConsumer>> configIterator = + CONSUMER_FOR_CONFIG_KEY.entrySet().iterator(); + + while (configIterator.hasNext()) { + Map.Entry<ImmutableTriple<String, String, String>, KafkaConsumer> entry = configIterator.next(); + KafkaConsumer kafkaConsumer = entry.getValue(); + + final Long releaseTime = CONSUMER_RELEASE_TIME.get(kafkaConsumer); + if (!releaseTime.equals(IN_USE) && releaseTime < System.currentTimeMillis()) { + LOGGER.info("Releasing consumer/iterator {}", kafkaConsumer); + + try { + kafkaConsumer.close(); + } catch (Exception e) { + LOGGER.warn("Caught exception while shutting down Kafka consumer with id {}", kafkaConsumer, e); + } + + configIterator.remove(); + CONSUMER_RELEASE_TIME.remove(kafkaConsumer); + } else { + LOGGER.info("Not releasing consumer/iterator {}, it has been reacquired", kafkaConsumer); + } + } + } + } catch (Exception e) { + LOGGER.warn("Caught exception in release of consumer/iterator {}", e, kafkaConsumer); + } + } + }.start(); + } + } + + public static void closeAllConsumers() { + try { + // Shutdown all consumers + synchronized (Kafka2ConsumerManager.class) { + LOGGER.info("Trying to shutdown all the kafka consumers"); + Iterator<KafkaConsumer> consumerIterator = CONSUMER_FOR_CONFIG_KEY.values().iterator(); + + while (consumerIterator.hasNext()) { + KafkaConsumer kafkaConsumer = consumerIterator.next(); + LOGGER.info("Trying to shutdown consumer/iterator {}", kafkaConsumer); + try { + kafkaConsumer.close(); + } catch (Exception e) { + LOGGER.warn("Caught exception while shutting down Kafka consumer with id {}", kafkaConsumer, e); + } + consumerIterator.remove(); + } + CONSUMER_FOR_CONFIG_KEY.clear(); + CONSUMER_RELEASE_TIME.clear(); + } + } catch (Exception e) { + LOGGER.warn("Caught exception during shutting down all kafka consumers", e); + } + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java new file mode 100644 index 0000000..f866288 --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; +import org.apache.pinot.common.utils.EqualityUtils; +import org.apache.pinot.core.realtime.stream.StreamConfig; +import org.apache.pinot.core.realtime.stream.StreamConfigProperties; + + +/** + * Wrapper around {@link StreamConfig} for use in the {@link Kafka2StreamLevelConsumer} + */ +public class Kafka2HighLevelStreamConfig { + private static final String DEFAULT_AUTO_COMMIT_ENABLE = "false"; + + private static final Map<String, String> defaultProps; + private String _kafkaTopicName; + private String _groupId; + private String _bootstrapServers; + private Map<String, String> _kafkaConsumerProperties; + + /** + * Builds a wrapper around {@link StreamConfig} to fetch kafka stream level consumer specific configs + * @param streamConfig + * @param tableName + * @param instanceZKMetadata + */ + public Kafka2HighLevelStreamConfig(StreamConfig streamConfig, String tableName, + InstanceZKMetadata instanceZKMetadata) { + Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap(); + + _kafkaTopicName = streamConfig.getTopicName(); + String hlcBootstrapBrokerUrlKey = Kafka2StreamConfigProperties + .constructStreamProperty(Kafka2StreamConfigProperties.HighLevelConsumer.KAFKA_HLC_BOOTSTRAP_SERVER); + _bootstrapServers = streamConfigMap.get(hlcBootstrapBrokerUrlKey); + Preconditions.checkNotNull(_bootstrapServers, + "Must specify bootstrap broker connect string " + hlcBootstrapBrokerUrlKey + " in high level kafka consumer"); + _groupId = instanceZKMetadata.getGroupId(tableName); + + _kafkaConsumerProperties = new HashMap<>(); + String kafkaConsumerPropertyPrefix = + Kafka2StreamConfigProperties.constructStreamProperty(Kafka2StreamConfigProperties.KAFKA_CONSUMER_PROP_PREFIX); + for (String key : streamConfigMap.keySet()) { + if (key.startsWith(kafkaConsumerPropertyPrefix)) { + _kafkaConsumerProperties + .put(StreamConfigProperties.getPropertySuffix(key, kafkaConsumerPropertyPrefix), streamConfigMap.get(key)); + } + } + } + + public String getKafkaTopicName() { + return _kafkaTopicName; + } + + public String getGroupId() { + return _groupId; + } + + public Properties getKafkaConsumerProperties() { + Properties props = new Properties(); + for (String key : defaultProps.keySet()) { + props.put(key, defaultProps.get(key)); + } + for (String key : _kafkaConsumerProperties.keySet()) { + props.put(key, _kafkaConsumerProperties.get(key)); + } + props.put("group.id", _groupId); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _bootstrapServers); + return props; + } + + @Override + public String toString() { + return "Kafka2HighLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _groupId='" + _groupId + + '\'' + ", _bootstrapServers='" + _bootstrapServers + '\'' + ", _kafkaConsumerProperties=" + + _kafkaConsumerProperties + '}'; + } + + @Override + public boolean equals(Object o) { + if (EqualityUtils.isSameReference(this, o)) { + return true; + } + + if (EqualityUtils.isNullOrNotSameClass(this, o)) { + return false; + } + + Kafka2HighLevelStreamConfig that = (Kafka2HighLevelStreamConfig) o; + + return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils + .isEqual(_groupId, that._groupId) && EqualityUtils.isEqual(_bootstrapServers, that._bootstrapServers) + && EqualityUtils.isEqual(_kafkaConsumerProperties, that._kafkaConsumerProperties); + } + + @Override + public int hashCode() { + int result = EqualityUtils.hashCodeOf(_kafkaTopicName); + result = EqualityUtils.hashCodeOf(result, _groupId); + result = EqualityUtils.hashCodeOf(result, _bootstrapServers); + result = EqualityUtils.hashCodeOf(result, _kafkaConsumerProperties); + return result; + } + + public String getBootstrapServers() { + return _bootstrapServers; + } + + static { + defaultProps = new HashMap<>(); + defaultProps.put(Kafka2StreamConfigProperties.HighLevelConsumer.AUTO_COMMIT_ENABLE, DEFAULT_AUTO_COMMIT_ENABLE); + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java new file mode 100644 index 0000000..13bd41b --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import java.util.ArrayList; +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.pinot.core.realtime.stream.MessageBatch; + + +public class Kafka2MessageBatch implements MessageBatch<byte[]> { + + private List<MessageAndOffset> messageList = new ArrayList<>(); + + public Kafka2MessageBatch(Iterable<ConsumerRecord<String, byte[]>> iterable) { + for (ConsumerRecord<String, byte[]> record : iterable) { + messageList.add(new MessageAndOffset(record.value(), record.offset())); + } + } + + @Override + public int getMessageCount() { + return messageList.size(); + } + + @Override + public byte[] getMessageAtIndex(int index) { + return messageList.get(index).getMessage().array(); + } + + @Override + public int getMessageOffsetAtIndex(int index) { + return messageList.get(index).getMessage().arrayOffset(); + } + + @Override + public int getMessageLengthAtIndex(int index) { + return messageList.get(index).payloadSize(); + } + + @Override + public long getNextStreamMessageOffsetAtIndex(int index) { + return messageList.get(index).getNextOffset(); + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java new file mode 100644 index 0000000..3f2550d --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.pinot.core.realtime.stream.StreamConfig; + + +public abstract class Kafka2PartitionLevelConnectionHandler { + + protected final Kafka2PartitionLevelStreamConfig _config; + protected final String _clientId; + protected final int _partition; + protected final String _topic; + protected final Consumer<String, byte[]> _consumer; + protected final TopicPartition _topicPartition; + + public Kafka2PartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig, int partition) { + _config = new Kafka2PartitionLevelStreamConfig(streamConfig); + _clientId = clientId; + _partition = partition; + _topic = _config.getKafkaTopicName(); + Properties consumerProp = new Properties(); + consumerProp.putAll(streamConfig.getStreamConfigsMap()); + consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts()); + consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); + _consumer = new KafkaConsumer<>(consumerProp); + _topicPartition = new TopicPartition(_topic, _partition); + _consumer.assign(Collections.singletonList(_topicPartition)); + } + + public void close() + throws IOException { + _consumer.close(); + } + + @VisibleForTesting + protected Kafka2PartitionLevelStreamConfig getKafka2PartitionLevelStreamConfig() { + return _config; + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java new file mode 100644 index 0000000..19f520a --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.pinot.core.realtime.stream.MessageBatch; +import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer; +import org.apache.pinot.core.realtime.stream.StreamConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class Kafka2PartitionLevelPartitionLevelConsumer extends Kafka2PartitionLevelConnectionHandler implements PartitionLevelConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(Kafka2PartitionLevelPartitionLevelConsumer.class); + + public Kafka2PartitionLevelPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) { + super(clientId, streamConfig, partition); + } + + @Override + public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis) + throws TimeoutException { + _consumer.seek(_topicPartition, startOffset); + ConsumerRecords<String, byte[]> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis)); + final Iterable<ConsumerRecord<String, byte[]>> messageAndOffsetIterable = + buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset); + return new Kafka2MessageBatch(messageAndOffsetIterable); + } + + private Iterable<ConsumerRecord<String, byte[]>> buildOffsetFilteringIterable( + final List<ConsumerRecord<String, byte[]>> messageAndOffsets, final long startOffset, final long endOffset) { + return Iterables.filter(messageAndOffsets, input -> { + // Filter messages that are either null or have an offset ∉ [startOffset, endOffset] + return input != null && input.offset() >= startOffset && (endOffset > input.offset() || endOffset == -1); + }); + } + + @Override + public void close() + throws IOException { + super.close(); + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java new file mode 100644 index 0000000..fcc0e04 --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import com.google.common.base.Preconditions; +import java.util.Map; +import org.apache.commons.lang.StringUtils; +import org.apache.pinot.common.utils.EqualityUtils; +import org.apache.pinot.core.realtime.stream.StreamConfig; + + +/** + * Wrapper around {@link StreamConfig} for use in {@link Kafka2PartitionLevelPartitionLevelConsumer} + */ +public class Kafka2PartitionLevelStreamConfig { + + private final String _kafkaTopicName; + private final String _bootstrapHosts; + private final int _kafkaBufferSize; + private final int _kafkaSocketTimeout; + private final int _kafkaFetcherSizeBytes; + private final int _kafkaFetcherMinBytes; + private final Map<String, String> _streamConfigMap; + + /** + * Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs + * @param streamConfig + */ + public Kafka2PartitionLevelStreamConfig(StreamConfig streamConfig) { + _streamConfigMap = streamConfig.getStreamConfigsMap(); + + _kafkaTopicName = streamConfig.getTopicName(); + + String llcBrokerListKey = Kafka2StreamConfigProperties + .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST); + String llcBufferKey = Kafka2StreamConfigProperties + .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE); + String llcTimeoutKey = Kafka2StreamConfigProperties + .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT); + String fetcherSizeKey = Kafka2StreamConfigProperties + .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES); + String fetcherMinBytesKey = Kafka2StreamConfigProperties + .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES); + _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey); + _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, llcBufferKey, + Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT); + _kafkaSocketTimeout = getIntConfigWithDefault(_streamConfigMap, llcTimeoutKey, + Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT); + _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, fetcherSizeKey, _kafkaBufferSize); + _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, fetcherMinBytesKey, + Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT); + Preconditions.checkNotNull(_bootstrapHosts, + "Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer"); + } + + public String getKafkaTopicName() { + return _kafkaTopicName; + } + + public String getBootstrapHosts() { + return _bootstrapHosts; + } + + public int getKafkaBufferSize() { + return _kafkaBufferSize; + } + + public int getKafkaSocketTimeout() { + return _kafkaSocketTimeout; + } + + public int getKafkaFetcherSizeBytes() { + return _kafkaFetcherSizeBytes; + } + + public int getKafkaFetcherMinBytes() { + return _kafkaFetcherMinBytes; + } + + private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) { + String stringValue = configMap.get(key); + try { + if (StringUtils.isNotEmpty(stringValue)) { + return Integer.parseInt(stringValue); + } + return defaultValue; + } catch (NumberFormatException ex) { + return defaultValue; + } + } + + @Override + public String toString() { + return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='" + + _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + _kafkaBufferSize + '\'' + ", _kafkaSocketTimeout='" + + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" + _kafkaFetcherSizeBytes + '\'' + + ", _kafkaFetcherMinBytes='" + _kafkaFetcherMinBytes + '\'' + '}'; + } + + @Override + public boolean equals(Object o) { + if (EqualityUtils.isSameReference(this, o)) { + return true; + } + + if (EqualityUtils.isNullOrNotSameClass(this, o)) { + return false; + } + + Kafka2PartitionLevelStreamConfig that = (Kafka2PartitionLevelStreamConfig) o; + + return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils + .isEqual(_bootstrapHosts, that._bootstrapHosts) && EqualityUtils + .isEqual(_kafkaBufferSize, that._kafkaBufferSize) && EqualityUtils + .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) && EqualityUtils + .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) && EqualityUtils + .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes); + } + + @Override + public int hashCode() { + int result = EqualityUtils.hashCodeOf(_kafkaTopicName); + result = EqualityUtils.hashCodeOf(result, _bootstrapHosts); + result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize); + result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout); + result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes); + result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes); + return result; + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java new file mode 100644 index 0000000..7a0558d --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; +import org.apache.pinot.core.realtime.stream.OffsetCriteria; +import org.apache.pinot.core.realtime.stream.StreamConfig; +import org.apache.pinot.core.realtime.stream.StreamMetadataProvider; + + +public class Kafka2PartitionLevelStreamMetadataProvider extends Kafka2PartitionLevelConnectionHandler implements StreamMetadataProvider { + + public Kafka2PartitionLevelStreamMetadataProvider(String clientId, StreamConfig streamConfig) { + this(clientId, streamConfig, Integer.MIN_VALUE); + } + + public Kafka2PartitionLevelStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) { + super(clientId, streamConfig, partition); + } + + @Override + public int fetchPartitionCount(long timeoutMillis) { + return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size(); + } + + @Override + public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) + throws TimeoutException { + Preconditions.checkNotNull(offsetCriteria); + if (offsetCriteria.isLargest()) { + return _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); + } else if (offsetCriteria.isSmallest()) { + return _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); + } else { + throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString()); + } + } + + @Override + public void close() + throws IOException { + super.close(); + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java similarity index 76% rename from pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java rename to pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java index 3c45d6e..ed27dfc 100644 --- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java @@ -25,19 +25,22 @@ import org.apache.pinot.core.realtime.stream.StreamConfigProperties; /** * Property key definitions for all kafka stream related properties */ -public class KafkaStreamConfigProperties { +public class Kafka2StreamConfigProperties { public static final String DOT_SEPARATOR = "."; - public static final String STREAM_TYPE = "kafka"; + public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop"; + + /** + * Helper method to create a property string for kafka stream + * @param property + * @return + */ + public static String constructStreamProperty(String property) { + return Joiner.on(DOT_SEPARATOR).join(StreamConfigProperties.STREAM_PREFIX, property); + } public static class HighLevelConsumer { - public static final String KAFKA_HLC_ZK_CONNECTION_STRING = "kafka.hlc.zk.connect.string"; - public static final String ZK_SESSION_TIMEOUT_MS = "zookeeper.session.timeout.ms"; - public static final String ZK_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms"; - public static final String ZK_SYNC_TIME_MS = "zookeeper.sync.time.ms"; - public static final String REBALANCE_MAX_RETRIES = "rebalance.max.retries"; - public static final String REBALANCE_BACKOFF_MS = "rebalance.backoff.ms"; + public static final String KAFKA_HLC_BOOTSTRAP_SERVER = "kafka.hlc.bootstrap.server"; public static final String AUTO_COMMIT_ENABLE = "auto.commit.enable"; - public static final String AUTO_OFFSET_RESET = "auto.offset.reset"; } public static class LowLevelConsumer { @@ -50,16 +53,5 @@ public class KafkaStreamConfigProperties { public static final String KAFKA_FETCHER_MIN_BYTES = "kafka.fetcher.minBytes"; public static final int KAFKA_FETCHER_MIN_BYTES_DEFAULT = 100000; } - - public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop"; - - /** - * Helper method to create a property string for kafka stream - * @param property - * @return - */ - public static String constructStreamProperty(String property) { - return Joiner.on(DOT_SEPARATOR).join(StreamConfigProperties.STREAM_PREFIX, property); - } } diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java new file mode 100644 index 0000000..4bbf975 --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import com.yammer.metrics.core.Meter; +import java.time.Duration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.core.data.GenericRow; +import org.apache.pinot.core.realtime.stream.StreamConfig; +import org.apache.pinot.core.realtime.stream.StreamDecoderProvider; +import org.apache.pinot.core.realtime.stream.StreamLevelConsumer; +import org.apache.pinot.core.realtime.stream.StreamMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An implementation of a {@link StreamLevelConsumer} which consumes from the kafka stream + */ +public class Kafka2StreamLevelConsumer implements StreamLevelConsumer { + + private StreamMessageDecoder _messageDecoder; + private Logger INSTANCE_LOGGER; + + private String _clientId; + private String _tableAndStreamName; + + private StreamConfig _streamConfig; + private Kafka2HighLevelStreamConfig _kafka2HighLevelStreamConfig; + + private KafkaConsumer<byte[], byte[]> consumer; + private ConsumerRecords<byte[], byte[]> consumerRecords; + private Iterator<ConsumerRecord<byte[], byte[]>> kafkaIterator; + private Map<Integer, Long> consumerOffsets = new HashMap<>(); // tracking current consumed records offsets. + + private long lastLogTime = 0; + private long lastCount = 0; + private long currentCount = 0L; + + private ServerMetrics _serverMetrics; + private Meter tableAndStreamRowsConsumed = null; + private Meter tableRowsConsumed = null; + + public Kafka2StreamLevelConsumer(String clientId, String tableName, StreamConfig streamConfig, Schema schema, + InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) { + _clientId = clientId; + _streamConfig = streamConfig; + _kafka2HighLevelStreamConfig = new Kafka2HighLevelStreamConfig(streamConfig, tableName, instanceZKMetadata); + _serverMetrics = serverMetrics; + + _messageDecoder = StreamDecoderProvider.create(streamConfig, schema); + + _tableAndStreamName = tableName + "-" + streamConfig.getTopicName(); + INSTANCE_LOGGER = LoggerFactory + .getLogger(Kafka2StreamLevelConsumer.class.getName() + "_" + tableName + "_" + streamConfig.getTopicName()); + } + + @Override + public void start() + throws Exception { + consumer = Kafka2ConsumerManager.acquireKafkaConsumerForConfig(_kafka2HighLevelStreamConfig); + } + + private void updateKafkaIterator() { + consumerRecords = consumer.poll(Duration.ofMillis(_streamConfig.getFetchTimeoutMillis())); + kafkaIterator = consumerRecords.iterator(); + } + + @Override + public GenericRow next(GenericRow destination) { + if (!kafkaIterator.hasNext()) { + updateKafkaIterator(); + } + if (kafkaIterator.hasNext()) { + try { + final ConsumerRecord<byte[], byte[]> record = kafkaIterator.next(); + updateOffsets(record.partition(), record.offset()); + destination = _messageDecoder.decode(record.value(), destination); + tableAndStreamRowsConsumed = _serverMetrics + .addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L, + tableAndStreamRowsConsumed); + tableRowsConsumed = + _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, tableRowsConsumed); + + ++currentCount; + + final long now = System.currentTimeMillis(); + // Log every minute or 100k events + if (now - lastLogTime > 60000 || currentCount - lastCount >= 100000) { + if (lastCount == 0) { + INSTANCE_LOGGER.info("Consumed {} events from kafka stream {}", currentCount, _streamConfig.getTopicName()); + } else { + INSTANCE_LOGGER.info("Consumed {} events from kafka stream {} (rate:{}/s)", currentCount - lastCount, + _streamConfig.getTopicName(), (float) (currentCount - lastCount) * 1000 / (now - lastLogTime)); + } + lastCount = currentCount; + lastLogTime = now; + } + return destination; + } catch (Exception e) { + INSTANCE_LOGGER.warn("Caught exception while consuming events", e); + _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L); + _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L); + throw e; + } + } + return null; + } + + private void updateOffsets(int partition, long offset) { + consumerOffsets.put(partition, offset); + } + + @Override + public void commit() { + consumer.commitSync(getOffsetsMap()); + consumerOffsets.clear(); + _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_OFFSET_COMMITS, 1L); + _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L); + } + + private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() { + Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>(); + for (Integer partition : consumerOffsets.keySet()) { + offsetsMap.put(new TopicPartition(_streamConfig.getTopicName(), partition), + new OffsetAndMetadata(consumerOffsets.get(partition))); + } + return offsetsMap; + } + + @Override + public void shutdown() + throws Exception { + if (consumer != null) { + consumer = null; + Kafka2ConsumerManager.releaseKafkaConsumer(consumer); + } + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java new file mode 100644 index 0000000..5e09faf --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.commons.lang.StringUtils; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.common.utils.retry.RetryPolicies; +import org.apache.pinot.core.data.GenericRow; +import org.apache.pinot.core.realtime.stream.AvroRecordToPinotRowGenerator; +import org.apache.pinot.core.realtime.stream.StreamMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@NotThreadSafe +public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAvroMessageDecoder.class); + + private static final String SCHEMA_REGISTRY_REST_URL = "schema.registry.rest.url"; + private static final String SCHEMA_REGISTRY_SCHEMA_NAME = "schema.registry.schema.name"; + private org.apache.avro.Schema defaultAvroSchema; + private MD5AvroSchemaMap md5ToAvroSchemaMap; + + // A global cache for schemas across all threads. + private static final Map<String, org.apache.avro.Schema> globalSchemaCache = new HashMap<>(); + // Suffix for getting the latest schema + private static final String LATEST = "-latest"; + + // Reusable byte[] to read MD5 from payload. This is OK as this class is used only by a single thread. + private final byte[] reusableMD5Bytes = new byte[SCHEMA_HASH_LENGTH]; + + private DecoderFactory decoderFactory; + private AvroRecordToPinotRowGenerator avroRecordConvetrer; + + private static final int MAGIC_BYTE_LENGTH = 1; + private static final int SCHEMA_HASH_LENGTH = 16; + private static final int HEADER_LENGTH = MAGIC_BYTE_LENGTH + SCHEMA_HASH_LENGTH; + + private static final int SCHEMA_HASH_START_OFFSET = MAGIC_BYTE_LENGTH; + + private static final int MAXIMUM_SCHEMA_FETCH_RETRY_COUNT = 5; + private static final int MINIMUM_SCHEMA_FETCH_RETRY_TIME_MILLIS = 500; + private static final float SCHEMA_FETCH_RETRY_EXPONENTIAL_BACKOFF_FACTOR = 2.0f; + + private String[] schemaRegistryUrls; + + @Override + public void init(Map<String, String> props, Schema indexingSchema, String topicName) + throws Exception { + schemaRegistryUrls = parseSchemaRegistryUrls(props.get(SCHEMA_REGISTRY_REST_URL)); + + for (String schemaRegistryUrl : schemaRegistryUrls) { + StringUtils.chomp(schemaRegistryUrl, "/"); + } + + String avroSchemaName = topicName; + if (props.containsKey(SCHEMA_REGISTRY_SCHEMA_NAME) && props.get(SCHEMA_REGISTRY_SCHEMA_NAME) != null && !props + .get(SCHEMA_REGISTRY_SCHEMA_NAME).isEmpty()) { + avroSchemaName = props.get(SCHEMA_REGISTRY_SCHEMA_NAME); + } + // With the logic below, we may not set defaultAvroSchema to be the latest one everytime. + // The schema is fetched once when the machine starts. Until the next restart. the latest schema is + // not fetched. + // But then we always pay attention to the exact MD5 hash and attempt to fetch the schema for that particular hash + // before decoding an incoming kafka event. We use defaultAvroSchema only if the fetch for the particular MD5 fails, + // but then we will retry that fetch on every event in case of failure. + synchronized (globalSchemaCache) { + final String hashKey = avroSchemaName + LATEST; + defaultAvroSchema = globalSchemaCache.get(hashKey); + if (defaultAvroSchema == null) { + defaultAvroSchema = fetchSchema("/latest_with_type=" + avroSchemaName); + globalSchemaCache.put(hashKey, defaultAvroSchema); + LOGGER.info("Populated schema cache with schema for {}", hashKey); + } + } + this.avroRecordConvetrer = new AvroRecordToPinotRowGenerator(indexingSchema); + this.decoderFactory = new DecoderFactory(); + md5ToAvroSchemaMap = new MD5AvroSchemaMap(); + } + + @Override + public GenericRow decode(byte[] payload, GenericRow destination) { + return decode(payload, 0, payload.length, destination); + } + + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + if (payload == null || payload.length == 0 || length == 0) { + return null; + } + + System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, reusableMD5Bytes, 0, SCHEMA_HASH_LENGTH); + + boolean schemaUpdateFailed = false; + org.apache.avro.Schema schema = md5ToAvroSchemaMap.getSchema(reusableMD5Bytes); + if (schema == null) { + // We will get here for the first row consumed in the segment, and every row that has a schema ID that is + // not yet in md5ToAvroSchemaMap. + synchronized (globalSchemaCache) { + final String hashKey = hex(reusableMD5Bytes); + schema = globalSchemaCache.get(hashKey); + if (schema == null) { + // We will get here only if no partition of the table has populated the global schema cache. + // In that case, one of the consumers will fetch the schema and populate the cache, and the others + // should find it in the cache and po + final String schemaUri = "/id=" + hex(reusableMD5Bytes); + try { + schema = fetchSchema(schemaUri); + globalSchemaCache.put(hashKey, schema); + md5ToAvroSchemaMap.addSchema(reusableMD5Bytes, schema); + } catch (Exception e) { + schema = defaultAvroSchema; + LOGGER + .error("Error fetching schema using url {}. Attempting to continue with previous schema", schemaUri, e); + schemaUpdateFailed = true; + } + } else { + LOGGER.info("Found schema for {} in cache", hashKey); + md5ToAvroSchemaMap.addSchema(reusableMD5Bytes, schema); + } + } + } + DatumReader<Record> reader = new GenericDatumReader<Record>(schema); + try { + Record avroRecord = reader.read(null, + decoderFactory.createBinaryDecoder(payload, HEADER_LENGTH + offset, length - HEADER_LENGTH, null)); + return avroRecordConvetrer.transform(avroRecord, destination); + } catch (IOException e) { + LOGGER.error("Caught exception while reading message using schema {}{}", + (schema == null ? "null" : schema.getName()), + (schemaUpdateFailed ? "(possibly due to schema update failure)" : ""), e); + return null; + } + } + + private String hex(byte[] bytes) { + StringBuilder builder = new StringBuilder(2 * bytes.length); + for (byte aByte : bytes) { + String hexString = Integer.toHexString(0xFF & aByte); + if (hexString.length() < 2) { + hexString = "0" + hexString; + } + builder.append(hexString); + } + return builder.toString(); + } + + private static class SchemaFetcher implements Callable<Boolean> { + private org.apache.avro.Schema _schema; + private URL url; + private boolean _isSuccessful = false; + + SchemaFetcher(URL url) { + this.url = url; + } + + @Override + public Boolean call() + throws Exception { + try { + URLConnection conn = url.openConnection(); + conn.setConnectTimeout(15000); + conn.setReadTimeout(15000); + LOGGER.info("Fetching schema using url {}", url.toString()); + + StringBuilder queryResp = new StringBuilder(); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) { + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + queryResp.append(line); + } + } + + _schema = org.apache.avro.Schema.parse(queryResp.toString()); + + LOGGER.info("Schema fetch succeeded on url {}", url.toString()); + return Boolean.TRUE; + } catch (Exception e) { + LOGGER.warn("Caught exception while fetching schema", e); + return Boolean.FALSE; + } + } + + public org.apache.avro.Schema getSchema() { + return _schema; + } + } + + private org.apache.avro.Schema fetchSchema(String reference) + throws Exception { + SchemaFetcher schemaFetcher = new SchemaFetcher(makeRandomUrl(reference)); + RetryPolicies + .exponentialBackoffRetryPolicy(MAXIMUM_SCHEMA_FETCH_RETRY_COUNT, MINIMUM_SCHEMA_FETCH_RETRY_TIME_MILLIS, + SCHEMA_FETCH_RETRY_EXPONENTIAL_BACKOFF_FACTOR).attempt(schemaFetcher); + return schemaFetcher.getSchema(); + } + + /** + * Private class for encapsulating MD5 to Avro schema mapping. + * <ul> + * <li> Maintains two lists, one for md5s and another for schema. </li> + * <li> MD5 at index i in the MD5 list, corresponds to Schema at index i in the schema list. </li> + * </ul> + */ + private static class MD5AvroSchemaMap { + private List<byte[]> md5s; + private List<org.apache.avro.Schema> schemas; + + /** + * Constructor for the class. + */ + private MD5AvroSchemaMap() { + md5s = new ArrayList<>(); + schemas = new ArrayList<>(); + } + + /** + * Returns the Avro schema corresponding to the given MD5. + * + * @param md5ForSchema MD5 for which to get the avro schema. + * @return Avro schema for the given MD5. + */ + private org.apache.avro.Schema getSchema(byte[] md5ForSchema) { + for (int i = 0; i < md5s.size(); i++) { + if (Arrays.equals(md5s.get(i), md5ForSchema)) { + return schemas.get(i); + } + } + return null; + } + + /** + * Adds mapping between MD5 and Avro schema. + * Caller to ensure that addSchema is called only once per MD5-Schema pair. + * + * @param md5 MD5 for the Schema + * @param schema Avro Schema + */ + private void addSchema(byte[] md5, org.apache.avro.Schema schema) { + md5s.add(Arrays.copyOf(md5, md5.length)); + schemas.add(schema); + } + } + + protected URL makeRandomUrl(String reference) + throws MalformedURLException { + Random rand = new Random(); + int randomInteger = rand.nextInt(schemaRegistryUrls.length); + return new URL(schemaRegistryUrls[randomInteger] + reference); + } + + protected String[] parseSchemaRegistryUrls(String schemaConfig) { + return schemaConfig.split(","); + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java deleted file mode 100644 index 802062f..0000000 --- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.realtime.impl.kafka2; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.BytesDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.pinot.core.realtime.stream.StreamConfig; - -import java.io.IOException; -import java.util.Collections; -import java.util.Properties; - -public abstract class KafkaConnectionHandler { - - protected final KafkaPartitionLevelStreamConfig _config; - protected final int _partition; - protected final String _topic; - protected final Consumer<String, byte[]> _consumer; - protected final TopicPartition _topicPartition; - - public KafkaConnectionHandler(StreamConfig streamConfig, int partition) { - _config = new KafkaPartitionLevelStreamConfig(streamConfig); - _partition = partition; - _topic = _config.getKafkaTopicName(); - Properties consumerProp = new Properties(); - consumerProp.putAll(streamConfig.getStreamConfigsMap()); - consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts()); - consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); - _consumer = new KafkaConsumer<>(consumerProp); - _topicPartition = new TopicPartition(_topic, _partition); - _consumer.assign(Collections.singletonList(_topicPartition)); - - } - - public void close() throws IOException { - _consumer.close(); - } - - -} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java new file mode 100644 index 0000000..8d1fd96 --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Arrays; +import java.util.Map; +import org.apache.pinot.common.data.FieldSpec; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.common.utils.JsonUtils; +import org.apache.pinot.core.data.GenericRow; +import org.apache.pinot.core.realtime.stream.StreamMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class KafkaJSONMessageDecoder implements StreamMessageDecoder<byte[]> { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJSONMessageDecoder.class); + + private Schema schema; + + @Override + public void init(Map<String, String> props, Schema indexingSchema, String topicName) + throws Exception { + this.schema = indexingSchema; + } + + @Override + public GenericRow decode(byte[] payload, GenericRow destination) { + try { + JsonNode message = JsonUtils.bytesToJsonNode(payload); + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + String column = fieldSpec.getName(); + destination.putField(column, JsonUtils.extractValue(message.get(column), fieldSpec)); + } + return destination; + } catch (Exception e) { + LOGGER.error("Caught exception while decoding row, discarding row.", e); + return null; + } + } + + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java deleted file mode 100644 index 22aa683..0000000 --- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.realtime.impl.kafka2; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.pinot.core.realtime.stream.MessageBatch; - -import java.util.ArrayList; -import java.util.List; - -public class KafkaMessageBatch implements MessageBatch<byte[]> { - - private List<MessageAndOffset> messageList = new ArrayList<>(); - - public KafkaMessageBatch(Iterable<ConsumerRecord<String, byte[]>> iterable) { - for (ConsumerRecord<String, byte[]> record : iterable) { - messageList.add(new MessageAndOffset(record.value(), record.offset())); - } - } - - @Override - public int getMessageCount() { - return messageList.size(); - } - - @Override - public byte[] getMessageAtIndex(int index) { - return messageList.get(index).getMessage().array(); - } - - @Override - public int getMessageOffsetAtIndex(int index) { - return messageList.get(index).getMessage().arrayOffset(); - } - - @Override - public int getMessageLengthAtIndex(int index) { - return messageList.get(index).getMessage().array().length; - } - - @Override - public long getNextStreamMessageOffsetAtIndex(int index) { - return messageList.get(index).getNextOffset(); - } - - public Iterable<MessageAndOffset> iterable() { - return messageList; - } -} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java deleted file mode 100644 index de3295d..0000000 --- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.realtime.impl.kafka2; - -import org.apache.kafka.clients.consumer.*; - -import org.apache.pinot.core.realtime.stream.MessageBatch; -import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer; -import org.apache.pinot.core.realtime.stream.StreamConfig; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeoutException; - -public class KafkaPartitionConsumer extends KafkaConnectionHandler implements PartitionLevelConsumer { - - - public KafkaPartitionConsumer(StreamConfig streamConfig, int partition) { - super(streamConfig, partition); - } - - @Override - public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis) throws TimeoutException { - _consumer.seek(_topicPartition, startOffset); - - ConsumerRecords<String, byte[]> consumerRecords = _consumer.poll(null); - List<ConsumerRecord<String, byte[]>> records = consumerRecords.records(_topicPartition); - return new KafkaMessageBatch(records); - } - - @Override - public void close() throws IOException { - super.close(); - } -} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java deleted file mode 100644 index c154a38..0000000 --- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.realtime.impl.kafka2; - -import com.google.common.base.Preconditions; -import org.apache.commons.lang.StringUtils; -import org.apache.pinot.common.utils.EqualityUtils; -import org.apache.pinot.core.realtime.stream.StreamConfig; - -import java.util.Map; -import java.util.Properties; - -public class KafkaPartitionLevelStreamConfig { - - private final String _kafkaTopicName; - private final String _bootstrapHosts; - private final int _kafkaBufferSize; - private final int _kafkaSocketTimeout; - private final int _kafkaFetcherSizeBytes; - private final int _kafkaFetcherMinBytes; - private final Map<String, String> _streamConfigMap; - - /** - * Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs - * @param streamConfig - */ - public KafkaPartitionLevelStreamConfig(StreamConfig streamConfig) { - _streamConfigMap = streamConfig.getStreamConfigsMap(); - - _kafkaTopicName = streamConfig.getTopicName(); - - String llcBrokerListKey = KafkaStreamConfigProperties - .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST); - String llcBufferKey = KafkaStreamConfigProperties - .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE); - String llcTimeoutKey = KafkaStreamConfigProperties - .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT); - String fetcherSizeKey = KafkaStreamConfigProperties - .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES); - String fetcherMinBytesKey = KafkaStreamConfigProperties - .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES); - _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey); - _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, llcBufferKey, - KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT); - _kafkaSocketTimeout = getIntConfigWithDefault(_streamConfigMap, llcTimeoutKey, - KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT); - _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, fetcherSizeKey, _kafkaBufferSize); - _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, fetcherMinBytesKey, - KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT); - Preconditions.checkNotNull(_bootstrapHosts, - "Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer"); - } - - public String getKafkaTopicName() { - return _kafkaTopicName; - } - - public String getBootstrapHosts() { - return _bootstrapHosts; - } - - public int getKafkaBufferSize() { - return _kafkaBufferSize; - } - - public int getKafkaSocketTimeout() { - return _kafkaSocketTimeout; - } - - public int getKafkaFetcherSizeBytes() { - return _kafkaFetcherSizeBytes; - } - - public int getKafkaFetcherMinBytes() { - return _kafkaFetcherMinBytes; - } - - private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) { - String stringValue = configMap.get(key); - try { - if (StringUtils.isNotEmpty(stringValue)) { - return Integer.parseInt(stringValue); - } - return defaultValue; - } catch (NumberFormatException ex) { - return defaultValue; - } - } - - @Override - public String toString() { - return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='" - + _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + _kafkaBufferSize + '\'' + ", _kafkaSocketTimeout='" - + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" + _kafkaFetcherSizeBytes + '\'' + ", _kafkaFetcherMinBytes='" - + _kafkaFetcherMinBytes + '\'' + '}'; - } - - @Override - public boolean equals(Object o) { - if (EqualityUtils.isSameReference(this, o)) { - return true; - } - - if (EqualityUtils.isNullOrNotSameClass(this, o)) { - return false; - } - - KafkaPartitionLevelStreamConfig that = (KafkaPartitionLevelStreamConfig) o; - - return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils - .isEqual(_bootstrapHosts, that._bootstrapHosts) && EqualityUtils - .isEqual(_kafkaBufferSize, that._kafkaBufferSize) && EqualityUtils - .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) && EqualityUtils - .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) && EqualityUtils - .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes); - } - - @Override - public int hashCode() { - int result = EqualityUtils.hashCodeOf(_kafkaTopicName); - result = EqualityUtils.hashCodeOf(result, _bootstrapHosts); - result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize); - result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout); - result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes); - result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes); - return result; - } -} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java deleted file mode 100644 index 3871d85..0000000 --- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.realtime.impl.kafka2; - -import com.google.common.base.Preconditions; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.DescribeTopicsResult; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.KafkaFuture; -import org.apache.pinot.core.realtime.stream.OffsetCriteria; -import org.apache.pinot.core.realtime.stream.StreamConfig; -import org.apache.pinot.core.realtime.stream.StreamMetadataProvider; - -import javax.annotation.Nonnull; -import java.io.IOException; -import java.time.Duration; -import java.util.Collections; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - -public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implements StreamMetadataProvider { - - private AdminClient _adminClient; - - public KafkaStreamMetadataProvider(StreamConfig streamConfig, int partition) { - super(streamConfig, partition); - final Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts()); - _adminClient = AdminClient.create(props); - } - - @Override - public int fetchPartitionCount(long timeoutMillis) { - DescribeTopicsResult result = _adminClient.describeTopics(Collections.singletonList(_config.getKafkaTopicName())); - Map<String, KafkaFuture<TopicDescription>> values = result.values(); - KafkaFuture<TopicDescription> topicDescription = values.get(_config.getKafkaTopicName()); - try { - return topicDescription.get().partitions().size(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(""); - } - } - - @Override - public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException { - - Preconditions.checkNotNull(offsetCriteria); - if (offsetCriteria.isLargest()) { - return _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)).get(_topicPartition); - } else if (offsetCriteria.isSmallest()) { - return _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)).get(_topicPartition); - } else { - throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString()); - } - - } - - @Override - public void close() throws IOException { - super.close(); - } -} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java index 0dea267..b5bdaba 100644 --- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java @@ -20,30 +20,34 @@ package org.apache.pinot.core.realtime.impl.kafka2; import java.nio.ByteBuffer; + public class MessageAndOffset { - private ByteBuffer _message; - private long _offset; + private ByteBuffer _message; + private long _offset; + + public MessageAndOffset(byte[] message, long offset) { + this(ByteBuffer.wrap(message), offset); + } - public MessageAndOffset(byte[] message, long offset) { - _message = ByteBuffer.wrap(message); - _offset = offset; - } + public MessageAndOffset(ByteBuffer message, long offset) { + _message = message; + _offset = offset; + } - public MessageAndOffset(ByteBuffer message, long offset) { - _message = message; - _offset = offset; - } + public ByteBuffer getMessage() { + return _message; + } - public ByteBuffer getMessage() { - return _message; - } + public long getOffset() { + return _offset; + } - public long getOffset() { - return _offset; - } + public long getNextOffset() { + return getOffset() + 1; + } - public long getNextOffset() { - return _offset + 1; - } + public int payloadSize() { + return getMessage().array().length; + } } diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java new file mode 100644 index 0000000..cc28127 --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.pinot.core.realtime.impl.kafka2.utils.MiniKafkaCluster; +import org.apache.pinot.core.realtime.stream.OffsetCriteria; +import org.apache.pinot.core.realtime.stream.StreamConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * Tests for the KafkaPartitionLevelConsumer. + */ +public class KafkaPartitionLevelConsumerTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumerTest.class); + private static final long STABILIZE_SLEEP_DELAYS = 3000; + private static final String TEST_TOPIC_1 = "foo"; + private static final String TEST_TOPIC_2 = "bar"; + private static final int NUM_MSG_PRODUCED = 1000; + + private static MiniKafkaCluster kafkaCluster; + private static String brokerAddress; + + @BeforeClass + public static void setup() + throws Exception { + kafkaCluster = new MiniKafkaCluster.Builder().newServer("0").build(); + LOGGER.info("Trying to start MiniKafkaCluster"); + kafkaCluster.start(); + brokerAddress = getKakfaBroker(); + kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1); + kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1); + Thread.sleep(STABILIZE_SLEEP_DELAYS); + produceMsgToKafka(); + Thread.sleep(STABILIZE_SLEEP_DELAYS); + } + + private static void produceMsgToKafka() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKakfaBroker()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + KafkaProducer p = new KafkaProducer<>(props); + for (int i = 0; i < NUM_MSG_PRODUCED; i++) { + p.send(new ProducerRecord(TEST_TOPIC_1, "sample_msg_" + i)); + p.send(new ProducerRecord(TEST_TOPIC_2, "sample_msg_" + i)); + } + } + + private static String getKakfaBroker() { + return "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0); + } + + @AfterClass + public static void shutDown() + throws Exception { + kafkaCluster.deleteTopic(TEST_TOPIC_1); + kafkaCluster.deleteTopic(TEST_TOPIC_2); + kafkaCluster.close(); + } + + @Test + public void testBuildConsumer() + throws Exception { + String streamType = "kafka"; + String streamKafkaTopicName = "theTopic"; + String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0); + String streamKafkaConsumerType = "simple"; + String clientId = "clientId"; + + Map<String, String> streamConfigMap = new HashMap<>(); + streamConfigMap.put("streamType", streamType); + streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName); + streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList); + streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType); + streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName()); + streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass"); + streamConfigMap.put("stream.kafka.fetcher.size", "10000"); + streamConfigMap.put("stream.kafka.fetcher.minBytes", "20000"); + StreamConfig streamConfig = new StreamConfig(streamConfigMap); + + Kafka2PartitionLevelStreamMetadataProvider streamMetadataProvider = + new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig); + + // test default value + Kafka2PartitionLevelPartitionLevelConsumer kafkaSimpleStreamConsumer = + new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, 0); + kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000); + + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, + kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaBufferSize()); + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, + kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaSocketTimeout()); + + // test parsing values + Assert.assertEquals(10000, + kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaFetcherSizeBytes()); + Assert + .assertEquals(20000, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaFetcherMinBytes()); + + // test user defined values + streamConfigMap.put("stream.kafka.buffer.size", "100"); + streamConfigMap.put("stream.kafka.socket.timeout", "1000"); + streamConfig = new StreamConfig(streamConfigMap); + kafkaSimpleStreamConsumer = new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, 0); + kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000); + Assert.assertEquals(100, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaBufferSize()); + Assert.assertEquals(1000, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaSocketTimeout()); + } + + @Test + public void testGetPartitionCount() { + String streamType = "kafka"; + String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0); + String streamKafkaConsumerType = "simple"; + String clientId = "clientId"; + + Map<String, String> streamConfigMap = new HashMap<>(); + streamConfigMap.put("streamType", streamType); + streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_1); + streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList); + streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType); + streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName()); + streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass"); + StreamConfig streamConfig = new StreamConfig(streamConfigMap); + + Kafka2PartitionLevelStreamMetadataProvider streamMetadataProvider = + new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig); + Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 1); + + streamConfigMap = new HashMap<>(); + streamConfigMap.put("streamType", streamType); + streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_2); + streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList); + streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType); + streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName()); + streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass"); + streamConfig = new StreamConfig(streamConfigMap); + + streamMetadataProvider = new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig); + Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2); + } + + @Test + public void testFetchMessages() + throws Exception { + String streamType = "kafka"; + String streamKafkaTopicName = "theTopic"; + String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0); + String streamKafkaConsumerType = "simple"; + String clientId = "clientId"; + + Map<String, String> streamConfigMap = new HashMap<>(); + streamConfigMap.put("streamType", streamType); + streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName); + streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList); + streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType); + streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName()); + streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass"); + StreamConfig streamConfig = new StreamConfig(streamConfigMap); + + int partition = 0; + Kafka2PartitionLevelPartitionLevelConsumer kafkaSimpleStreamConsumer = + new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, partition); + kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000); + } + + @Test + public void testFetchOffsets() + throws Exception { + testFetchOffsets(TEST_TOPIC_1); + testFetchOffsets(TEST_TOPIC_2); + } + + private void testFetchOffsets(String topic) + throws Exception { + String streamType = "kafka"; + String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0); + String streamKafkaConsumerType = "simple"; + String clientId = "clientId"; + + Map<String, String> streamConfigMap = new HashMap<>(); + streamConfigMap.put("streamType", streamType); + streamConfigMap.put("stream.kafka.topic.name", topic); + streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList); + streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType); + streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName()); + streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass"); + StreamConfig streamConfig = new StreamConfig(streamConfigMap); + + int numPartitions = + new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig).fetchPartitionCount(10000); + for (int partition = 0; partition < numPartitions; partition++) { + Kafka2PartitionLevelStreamMetadataProvider kafkaStreamMetadataProvider = + new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig, partition); + Assert.assertEquals(0, kafkaStreamMetadataProvider + .fetchPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest(), 10000)); + Assert.assertEquals(NUM_MSG_PRODUCED / numPartitions, kafkaStreamMetadataProvider + .fetchPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest(), 10000)); + } + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java new file mode 100644 index 0000000..df02b9f --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.core.realtime.stream.StreamConfig; +import org.apache.pinot.core.realtime.stream.StreamConfigProperties; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class KafkaPartitionLevelStreamConfigTest { + + private Kafka2PartitionLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer, + String socketTimeout) { + return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, null); + } + + private Kafka2PartitionLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer, + String socketTimeout, String fetcherSize, String fetcherMinBytes) { + Map<String, String> streamConfigMap = new HashMap<>(); + String streamType = "kafka"; + String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString(); + String consumerFactoryClassName = Kafka2ConsumerFactory.class.getName(); + String decoderClass = KafkaAvroMessageDecoder.class.getName(); + streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType); + streamConfigMap + .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), + topic); + streamConfigMap + .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES), + consumerType); + streamConfigMap.put(StreamConfigProperties + .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), + consumerFactoryClassName); + streamConfigMap + .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS), + decoderClass); + streamConfigMap.put("stream.kafka.broker.list", bootstrapHosts); + if (buffer != null) { + streamConfigMap.put("stream.kafka.buffer.size", buffer); + } + if (socketTimeout != null) { + streamConfigMap.put("stream.kafka.socket.timeout", socketTimeout); + } + if (fetcherSize != null) { + streamConfigMap.put("stream.kafka.fetcher.size", fetcherSize); + } + if (fetcherMinBytes != null) { + streamConfigMap.put("stream.kafka.fetcher.minBytes", fetcherMinBytes); + } + return new Kafka2PartitionLevelStreamConfig(new StreamConfig(streamConfigMap)); + } + + @Test + public void testGetKafkaTopicName() { + Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "", "", ""); + Assert.assertEquals("topic", config.getKafkaTopicName()); + } + + @Test + public void testGetBootstrapHosts() { + Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", ""); + Assert.assertEquals("host1", config.getBootstrapHosts()); + } + + @Test + public void testGetKafkaBufferSize() { + // test default + Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", null, ""); + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, + config.getKafkaBufferSize()); + + config = getStreamConfig("topic", "host1", "", ""); + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, + config.getKafkaBufferSize()); + + config = getStreamConfig("topic", "host1", "bad value", ""); + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, + config.getKafkaBufferSize()); + + // correct config + config = getStreamConfig("topic", "host1", "100", ""); + Assert.assertEquals(100, config.getKafkaBufferSize()); + } + + @Test + public void testGetKafkaSocketTimeout() { + // test default + Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", null); + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, + config.getKafkaSocketTimeout()); + + config = getStreamConfig("topic", "host1", "", ""); + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, + config.getKafkaSocketTimeout()); + + config = getStreamConfig("topic", "host1", "", "bad value"); + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT, + config.getKafkaSocketTimeout()); + + // correct config + config = getStreamConfig("topic", "host1", "", "100"); + Assert.assertEquals(100, config.getKafkaSocketTimeout()); + } + + @Test + public void testGetFetcherSize() { + // test default + Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null); + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT, + config.getKafkaFetcherSizeBytes()); + + config = getStreamConfig("topic", "host1", "100", "", "", null); + Assert.assertEquals(100, config.getKafkaFetcherSizeBytes()); + + config = getStreamConfig("topic", "host1", "100", "", "bad value", null); + Assert.assertEquals(100, config.getKafkaFetcherSizeBytes()); + + // correct config + config = getStreamConfig("topic", "host1", "100", "", "200", null); + Assert.assertEquals(200, config.getKafkaFetcherSizeBytes()); + } + + @Test + public void testGetFetcherMinBytes() { + // test default + Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null); + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT, + config.getKafkaFetcherMinBytes()); + + config = getStreamConfig("topic", "host1", "", "", "", ""); + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT, + config.getKafkaFetcherMinBytes()); + + config = getStreamConfig("topic", "host1", "", "", "", "bad value"); + Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT, + config.getKafkaFetcherMinBytes()); + + // correct config + config = getStreamConfig("topic", "host1", "", "", "", "100"); + Assert.assertEquals(100, config.getKafkaFetcherMinBytes()); + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java new file mode 100644 index 0000000..47370aa --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2.utils; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import org.apache.commons.io.FileUtils; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; + + +public class EmbeddedZooKeeper implements Closeable { + + private static final int TICK_TIME = 500; + private final NIOServerCnxnFactory factory; + private final ZooKeeperServer zookeeper; + private final File tmpDir; + private final int port; + + EmbeddedZooKeeper() throws IOException, InterruptedException { + this.tmpDir = Files.createTempDirectory(null).toFile(); + this.factory = new NIOServerCnxnFactory(); + this.zookeeper = new ZooKeeperServer(new File(tmpDir, "data"), new File(tmpDir, "log"), + TICK_TIME); + InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0); + factory.configure(addr, 0); + factory.startup(zookeeper); + this.port = zookeeper.getClientPort(); + } + + public int getPort() { + return port; + } + + @Override + public void close() throws IOException { + zookeeper.shutdown(); + factory.shutdown(); + FileUtils.deleteDirectory(tmpDir); + } +} diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java new file mode 100644 index 0000000..3ec32fc --- /dev/null +++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.kafka2.utils; + +import java.io.Closeable; +import java.io.IOException; +import java.net.ServerSocket; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import org.apache.commons.io.FileUtils; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DeleteTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.Seq; + + +public final class MiniKafkaCluster implements Closeable { + + private static final Logger LOGGER = LoggerFactory.getLogger(MiniKafkaCluster.class); + private final EmbeddedZooKeeper zkServer; + private final ArrayList<KafkaServer> kafkaServer; + private final Path tempDir; + private final AdminClient adminClient; + + @SuppressWarnings({"rawtypes", "unchecked"}) + private MiniKafkaCluster(List<String> brokerIds) + throws IOException, InterruptedException { + this.zkServer = new EmbeddedZooKeeper(); + this.tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "mini-kafka-cluster"); + this.kafkaServer = new ArrayList<>(); + int port = 0; + for (String id : brokerIds) { + port = getAvailablePort(); + KafkaConfig c = new KafkaConfig(createBrokerConfig(id, port)); + Seq seq = + scala.collection.JavaConverters.collectionAsScalaIterableConverter(Collections.emptyList()).asScala().toSeq(); + kafkaServer.add(new KafkaServer(c, Time.SYSTEM, Option.empty(), seq)); + } + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + port); + adminClient = AdminClient.create(props); + } + + static int getAvailablePort() { + try { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } catch (IOException e) { + throw new RuntimeException("Failed to find available port to use", e); + } + } + + private Properties createBrokerConfig(String nodeId, int port) + throws IOException { + Properties props = new Properties(); + props.put("broker.id", nodeId); + props.put("port", Integer.toString(port)); + props.put("log.dir", Files.createTempDirectory(tempDir, "broker-").toAbsolutePath().toString()); + props.put("zookeeper.connect", "127.0.0.1:" + zkServer.getPort()); + props.put("replica.socket.timeout.ms", "1500"); + props.put("controller.socket.timeout.ms", "1500"); + props.put("controlled.shutdown.enable", "true"); + props.put("delete.topic.enable", "true"); + props.put("auto.create.topics.enable", "true"); + props.put("offsets.topic.replication.factor", "1"); + props.put("controlled.shutdown.retry.backoff.ms", "100"); + props.put("log.cleaner.dedupe.buffer.size", "2097152"); + return props; + } + + public void start() { + for (KafkaServer s : kafkaServer) { + s.startup(); + } + } + + @Override + public void close() + throws IOException { + for (KafkaServer s : kafkaServer) { + s.shutdown(); + } + this.zkServer.close(); + FileUtils.deleteDirectory(tempDir.toFile()); + } + + public EmbeddedZooKeeper getZkServer() { + return zkServer; + } + + public List<KafkaServer> getKafkaServer() { + return kafkaServer; + } + + public int getKafkaServerPort(int index) { + return kafkaServer.get(index).socketServer() + .boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)); + } + + public AdminClient getAdminClient() { + return adminClient; + } + + public boolean createTopic(String topicName, int numPartitions, int replicationFactor) { + NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) replicationFactor); + CreateTopicsResult createTopicsResult = this.adminClient.createTopics(Arrays.asList(newTopic)); + try { + createTopicsResult.all().get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Failed to create Kafka topic: {}, Exception: {}", newTopic.toString(), e); + return false; + } + return true; + } + + public boolean deleteTopic(String topicName) { + final DeleteTopicsResult deleteTopicsResult = this.adminClient.deleteTopics(Collections.singletonList(topicName)); + try { + deleteTopicsResult.all().get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Failed to delete Kafka topic: {}, Exception: {}", topicName, e); + return false; + } + return true; + } + + public static class Builder { + + private List<String> brokerIds = new ArrayList<>(); + + public Builder newServer(String brokerId) { + brokerIds.add(brokerId); + return this; + } + + public MiniKafkaCluster build() + throws IOException, InterruptedException { + return new MiniKafkaCluster(brokerIds); + } + } +} \ No newline at end of file diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml index 64d798d..1b45cc1 100644 --- a/pinot-connectors/pom.xml +++ b/pinot-connectors/pom.xml @@ -47,11 +47,23 @@ <groupId>org.apache.pinot</groupId> <artifactId>pinot-common</artifactId> <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.pinot</groupId> <artifactId>pinot-core</artifactId> <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </exclusion> + </exclusions> </dependency> <!-- test --> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
