Added ability to query by partition and/or offset Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/c6be1aea Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/c6be1aea Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/c6be1aea
Branch: refs/heads/master Commit: c6be1aea314a62fc26e1212248cf9abc9f915bca Parents: 5b8176e Author: Kasper Sørensen <[email protected]> Authored: Sun Jan 28 12:41:50 2018 -0800 Committer: Kasper Sørensen <[email protected]> Committed: Sun Jan 28 12:42:07 2018 -0800 ---------------------------------------------------------------------- kafka/pom.xml | 3 +- .../metamodel/kafka/KafkaConsumerFactory.java | 15 +- .../metamodel/kafka/KafkaDataContext.java | 152 +++++++++++++++++- .../apache/metamodel/kafka/KafkaDataSet.java | 10 +- .../kafka/KafkaDataContextIntegrationTest.java | 156 +++++++++++++++++++ pom.xml | 1 + 6 files changed, 324 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/kafka/pom.xml b/kafka/pom.xml index 0227218..52d557a 100644 --- a/kafka/pom.xml +++ b/kafka/pom.xml @@ -19,6 +19,7 @@ <modelVersion>4.0.0</modelVersion> <artifactId>MetaModel-kafka</artifactId> <name>MetaModel module for Apache Kafka</name> + <dependencies> <dependency> <groupId>org.apache.metamodel</groupId> @@ -34,7 +35,7 @@ <!-- Test dependencies --> <dependency> <groupId>org.slf4j</groupId> - <artifactId>slf4j-nop</artifactId> + <artifactId>slf4j-simple</artifactId> <scope>test</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java index 6cfd101..15e5f70 100644 --- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java +++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteBufferDeserializer; @@ -55,10 +56,16 @@ public class KafkaConsumerFactory implements ConsumerFactory { public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass) { final String groupId = "apache_metamodel_" + topic + "_" + System.currentTimeMillis(); - final Properties properties = new Properties(baseProperties); - properties.setProperty("group.id", groupId); - properties.setProperty("key.deserializer", deserializerForClass(keyClass).getName()); - properties.setProperty("value.deserializer", deserializerForClass(keyClass).getName()); + final Properties properties = new Properties(); + baseProperties.stringPropertyNames().forEach(k -> { + properties.setProperty(k, baseProperties.getProperty(k)); + }); + + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass).getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass) + .getName()); return new KafkaConsumer<>(properties); } http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java index f6cd664..eda881c 100644 --- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java +++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java @@ -18,8 +18,12 @@ */ package org.apache.metamodel.kafka; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -29,7 +33,11 @@ import org.apache.kafka.common.TopicPartition; import org.apache.metamodel.MetaModelException; import org.apache.metamodel.QueryPostprocessDataContext; import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.FirstRowDataSet; import org.apache.metamodel.data.MaxRowsDataSet; +import org.apache.metamodel.query.FilterItem; +import org.apache.metamodel.query.OperatorType; +import org.apache.metamodel.query.SelectItem; import org.apache.metamodel.schema.Column; import org.apache.metamodel.schema.ColumnType; import org.apache.metamodel.schema.ColumnTypeImpl; @@ -49,6 +57,11 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext { public static final String COLUMN_KEY = "key"; public static final String COLUMN_VALUE = "value"; + private static final Set<OperatorType> OPTIMIZED_PARTITION_OPERATORS = new HashSet<>(Arrays.asList( + OperatorType.EQUALS_TO, OperatorType.IN)); + private static final Set<OperatorType> OPTIMIZED_OFFSET_OPERATORS = new HashSet<>(Arrays.asList( + OperatorType.GREATER_THAN, OperatorType.GREATER_THAN_OR_EQUAL)); + private final Class<K> keyClass; private final Class<V> valueClass; private final ConsumerFactory consumerFactory; @@ -95,17 +108,148 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext { final String topic = table.getName(); final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass); final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); - List<TopicPartition> partitions = partitionInfos.stream().map(partitionInfo -> { + final List<TopicPartition> partitions = partitionInfos.stream().map(partitionInfo -> { return new TopicPartition(topic, partitionInfo.partition()); }).collect(Collectors.toList()); - consumer.seekToBeginning(partitions); consumer.assign(partitions); + consumer.seekToBeginning(partitions); + + final List<SelectItem> selectItems = columns.stream().map(col -> new SelectItem(col)).collect(Collectors + .toList()); + return materializeMainSchemaTableFromConsumer(consumer, selectItems, 0, maxRows); + } + + protected DataSet materializeMainSchemaTableFromConsumer(Consumer<K, V> consumer, List<SelectItem> selectItems, + int offset, int maxRows) { + DataSet dataSet = new KafkaDataSet<K, V>(consumer, selectItems); + if (offset > 0) { + dataSet = new FirstRowDataSet(dataSet, offset); + } if (maxRows > 0) { - return new MaxRowsDataSet(new KafkaDataSet<K, V>(consumer, columns), maxRows); + dataSet = new MaxRowsDataSet(dataSet, maxRows); + } + return dataSet; + } + + @Override + protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, List<FilterItem> whereItems, + int firstRow, int maxRows) { + // check if we can optimize the consumption when either "partition" or "offset" + // are in the where items. + if (!whereItems.isEmpty()) { + final boolean optimizable = whereItems.stream().allMatch(this::isOptimizable); + if (optimizable) { + long offset = 0; + List<Integer> partitions = null; + + for (FilterItem whereItem : whereItems) { + final OperatorType operator = whereItem.getOperator(); + switch (whereItem.getSelectItem().getColumn().getName()) { + case COLUMN_OFFSET: + if (operator == OperatorType.GREATER_THAN) { + offset = toLong(whereItem.getOperand()) + 1; + } else if (operator == OperatorType.GREATER_THAN_OR_EQUAL) { + offset = toLong(whereItem.getOperand()); + } else { + throw new UnsupportedOperationException(); + } + break; + case COLUMN_PARTITION: + if (operator == OperatorType.EQUALS_TO) { + partitions = Arrays.asList(toInt(whereItem.getOperand())); + } else if (operator == OperatorType.IN) { + partitions = toIntList(whereItem.getOperand()); + } else { + throw new UnsupportedOperationException(); + } + break; + default: + throw new UnsupportedOperationException(); + } + } + + final String topic = table.getName(); + final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass); + + // handle partition filtering + final List<TopicPartition> assignedPartitions; + if (partitions == null) { + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); + assignedPartitions = partitionInfos.stream().map(partitionInfo -> { + return new TopicPartition(topic, partitionInfo.partition()); + }).collect(Collectors.toList()); + } else { + assignedPartitions = partitions.stream().map(partitionNumber -> { + return new TopicPartition(topic, partitionNumber); + }).collect(Collectors.toList()); + } + + // handle offset filtering + consumer.assign(assignedPartitions); + if (offset == 0) { + consumer.seekToBeginning(assignedPartitions); + } else { + for (TopicPartition topicPartition : assignedPartitions) { + consumer.seek(topicPartition, offset); + } + } + + return materializeMainSchemaTableFromConsumer(consumer, selectItems, firstRow, maxRows); + } + } + return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows); + } + + private static List<Integer> toIntList(Object operand) { + final List<Integer> list = new ArrayList<>(); + if (operand instanceof Iterable) { + ((Iterable<?>) operand).forEach(o -> { + list.add(toInt(o)); + }); } - return new KafkaDataSet<K, V>(consumer, columns); + return list; } + private static int toInt(Object obj) { + if (obj instanceof Number) { + return ((Number) obj).intValue(); + } + return Integer.parseInt(obj.toString()); + } + + private static long toLong(Object obj) { + if (obj instanceof Number) { + return ((Number) obj).longValue(); + } + return Long.parseLong(obj.toString()); + } + + private boolean isOptimizable(FilterItem whereItem) { + if (whereItem.isCompoundFilter()) { + return false; + } + if (whereItem.getExpression() != null) { + return false; + } + final SelectItem selectItem = whereItem.getSelectItem(); + if (selectItem.getExpression() != null || selectItem.getAggregateFunction() != null || selectItem + .getScalarFunction() != null) { + return false; + } + final Column column = selectItem.getColumn(); + if (column == null) { + return false; + } + + switch (column.getName()) { + case COLUMN_OFFSET: + return OPTIMIZED_OFFSET_OPERATORS.contains(whereItem.getOperator()); + case COLUMN_PARTITION: + return OPTIMIZED_PARTITION_OPERATORS.contains(whereItem.getOperator()); + default: + return false; + } + } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java index 90dcf38..a214964 100644 --- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java +++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java @@ -20,7 +20,6 @@ package org.apache.metamodel.kafka; import java.util.Iterator; import java.util.List; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -30,7 +29,6 @@ import org.apache.metamodel.data.CachingDataSetHeader; import org.apache.metamodel.data.DefaultRow; import org.apache.metamodel.data.Row; import org.apache.metamodel.query.SelectItem; -import org.apache.metamodel.schema.Column; final class KafkaDataSet<K, V> extends AbstractDataSet { @@ -40,12 +38,16 @@ final class KafkaDataSet<K, V> extends AbstractDataSet { private Iterator<ConsumerRecord<K, V>> currentIterator; private ConsumerRecord<K, V> currentRow; - public KafkaDataSet(Consumer<K, V> consumer, List<Column> columns) { - super(new CachingDataSetHeader(columns.stream().map(col -> new SelectItem(col)).collect(Collectors.toList()))); + public KafkaDataSet(Consumer<K, V> consumer, List<SelectItem> selectItems) { + super(new CachingDataSetHeader(selectItems)); this.consumer = consumer; this.pollTimeout = Long.parseLong(System.getProperty(KafkaDataContext.SYSTEM_PROPERTY_CONSUMER_POLL_TIMEOUT, "1000")); } + + public Consumer<K, V> getConsumer() { + return consumer; + } @Override public boolean next() { http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java new file mode 100644 index 0000000..2d0cf5e --- /dev/null +++ b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java @@ -0,0 +1,156 @@ +package org.apache.metamodel.kafka; + +import java.util.Arrays; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.metamodel.DataContext; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.WrappingDataSet; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaDataContextIntegrationTest { + + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + private static final Logger logger = LoggerFactory.getLogger(KafkaDataContextIntegrationTest.class); + + @Test + public void testGetSchemaInfo() { + final DataContext dataContext1 = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays + .asList("non-existing-topic")); + + Assert.assertEquals("[non-existing-topic, default_table]", dataContext1.getDefaultSchema().getTableNames() + .toString()); + + final DataContext dataContext2 = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays + .asList("test1", "test2", "test3")); + Assert.assertEquals("[test1, test2, test3]", dataContext2.getDefaultSchema().getTableNames().toString()); + } + + @Test + public void testQueryNoFilters() { + final String topic = "test_" + UUID.randomUUID().toString().replaceAll("\\-", ""); + + final DataContext dataContext = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays + .asList(topic)); + + Assert.assertEquals("[" + topic + ", default_table]", dataContext.getDefaultSchema().getTableNames() + .toString()); + + final int numRecords = 10000; + + // create a producer thread + new Thread(createProducerRunnable(topic, numRecords), "producer").start(); + + int counter = 0; + try (DataSet dataSet = dataContext.query().from(topic).selectAll().execute()) { + logger.info("c: starting"); + while (dataSet.next()) { + counter++; + if (counter % 1000 == 0) { + logger.info("c: " + counter); + logger.info(dataSet.getRow().toString()); + } + } + logger.info("c: done - " + counter); + } + + Assert.assertEquals(numRecords, counter); + } + + @Test + public void testQueryUsingOffset() throws InterruptedException { + final String topic = "test_" + UUID.randomUUID().toString().replaceAll("\\-", ""); + + final DataContext dataContext = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays + .asList(topic)); + + final int numRecords = 1000; + final int queriedOffset = 500; + + // create a producer thread + final Thread thread = new Thread(createProducerRunnable(topic, numRecords), "producer"); + thread.start(); + thread.join(); // await completion so that the queried offset will exist at query time + + int counter = 0; + try (DataSet dataSet = dataContext.query().from(topic).selectAll().where("offset").gt(queriedOffset) + .execute()) { + + // check the assignment and position created for the consumer + @SuppressWarnings("resource") + DataSet innerDataSet = dataSet; + if (innerDataSet instanceof WrappingDataSet) { + innerDataSet = ((WrappingDataSet) dataSet).getWrappedDataSet(); + } + Assert.assertTrue(innerDataSet instanceof KafkaDataSet); + final Consumer<?, ?> consumer = ((KafkaDataSet<?, ?>) innerDataSet).getConsumer(); + final Set<TopicPartition> assignment = consumer.assignment(); + for (TopicPartition assignedTopic : assignment) { + final long position = consumer.position(assignedTopic); + Assert.assertEquals(queriedOffset + 1, position); + } + + while (dataSet.next()) { + counter++; + } + } + + // offset is 0 based, so "greater than 500" will leave 499 records + Assert.assertEquals(499, counter); + } + + private Runnable createProducerRunnable(String topic, int numRecords) { + return new Runnable() { + @Override + public void run() { + final Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "metamodel-test"); + + final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); + int counter = 0; + while (counter < numRecords) { + final String key = UUID.randomUUID().toString(); + final String value = UUID.randomUUID().toString(); + try { + producer.send(new ProducerRecord<String, String>(topic, key, value), new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + logger.info("Callback error"); + exception.printStackTrace(); + } + } + }); + } catch (Exception e) { + e.printStackTrace(); + break; + } + if (counter % 1000 == 0) { + logger.info("p: " + counter); + } + counter++; + } + logger.info("p: closing - " + counter); + producer.flush(); + producer.close(); + logger.info("p: done - " + counter); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 69d52c4..ee2d087 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ under the License. <module>json</module> <module>xml</module> <module>jdbc</module> + <module>kafka</module> <module>elasticsearch</module> <module>hadoop</module> <module>hbase</module>
