Repository: metamodel Updated Branches: refs/heads/master 0f7a09296 -> a868f71c0
Initial draft at a Kafka module for Apache MetaModel Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/5b8176e1 Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/5b8176e1 Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/5b8176e1 Branch: refs/heads/master Commit: 5b8176e18bd77e15317c6d7f7c22f7916a2db7c6 Parents: fe02a59 Author: Kasper Sørensen <[email protected]> Authored: Sat Jan 27 17:45:32 2018 -0800 Committer: Kasper Sørensen <[email protected]> Committed: Sat Jan 27 17:45:32 2018 -0800 ---------------------------------------------------------------------- .../org/apache/metamodel/MetaModelHelper.java | 2 +- .../apache/metamodel/data/AbstractDataSet.java | 1 - kafka/pom.xml | 46 ++++++++ .../apache/metamodel/kafka/ConsumerFactory.java | 32 ++++++ .../metamodel/kafka/KafkaConsumerFactory.java | 97 ++++++++++++++++ .../metamodel/kafka/KafkaDataContext.java | 111 +++++++++++++++++++ .../apache/metamodel/kafka/KafkaDataSet.java | 100 +++++++++++++++++ 7 files changed, 387 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/core/src/main/java/org/apache/metamodel/MetaModelHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/metamodel/MetaModelHelper.java b/core/src/main/java/org/apache/metamodel/MetaModelHelper.java index 59900f9..c484736 100644 --- a/core/src/main/java/org/apache/metamodel/MetaModelHelper.java +++ b/core/src/main/java/org/apache/metamodel/MetaModelHelper.java @@ -769,7 +769,7 @@ public final class MetaModelHelper { } public static SelectItem[] createSelectItems(Column... columns) { - SelectItem[] items = new SelectItem[columns.length]; + final SelectItem[] items = new SelectItem[columns.length]; for (int i = 0; i < items.length; i++) { items[i] = new SelectItem(columns[i]); } http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/core/src/main/java/org/apache/metamodel/data/AbstractDataSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/metamodel/data/AbstractDataSet.java b/core/src/main/java/org/apache/metamodel/data/AbstractDataSet.java index 33238a0..d382dc6 100644 --- a/core/src/main/java/org/apache/metamodel/data/AbstractDataSet.java +++ b/core/src/main/java/org/apache/metamodel/data/AbstractDataSet.java @@ -62,7 +62,6 @@ public abstract class AbstractDataSet extends BaseObject implements DataSet { _header = Objects.requireNonNull(header); } - /** * {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/kafka/pom.xml b/kafka/pom.xml new file mode 100644 index 0000000..0227218 --- /dev/null +++ b/kafka/pom.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <artifactId>MetaModel</artifactId> + <groupId>org.apache.metamodel</groupId> + <version>5.1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>MetaModel-kafka</artifactId> + <name>MetaModel module for Apache Kafka</name> + <dependencies> + <dependency> + <groupId>org.apache.metamodel</groupId> + <artifactId>MetaModel-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>1.0.0</version> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-nop</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java new file mode 100644 index 0000000..efabf38 --- /dev/null +++ b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.kafka; + +import org.apache.kafka.clients.consumer.Consumer; + +/** + * Factory interface for Kafka {@link Consumer} objects to be used by Apache + * MetaModel. Since Apache MetaModel may potentially serve multiple queries at + * the same times, multiple consumers may be needed. This class determines how + * these are instantiated. + */ +public interface ConsumerFactory { + + public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass); +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java new file mode 100644 index 0000000..6cfd101 --- /dev/null +++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.kafka; + +import java.nio.ByteBuffer; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteBufferDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.FloatDeserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.ShortDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Bytes; + +/** + * Default {@link ConsumerFactory} implementation. + */ +public class KafkaConsumerFactory implements ConsumerFactory { + + private final Properties baseProperties; + + public KafkaConsumerFactory(String bootstrapServers) { + this.baseProperties = new Properties(); + this.baseProperties.setProperty("bootstrap.servers", bootstrapServers); + } + + public KafkaConsumerFactory(Properties baseProperties) { + this.baseProperties = baseProperties; + } + + @Override + public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass) { + final String groupId = "apache_metamodel_" + topic + "_" + System.currentTimeMillis(); + + final Properties properties = new Properties(baseProperties); + properties.setProperty("group.id", groupId); + properties.setProperty("key.deserializer", deserializerForClass(keyClass).getName()); + properties.setProperty("value.deserializer", deserializerForClass(keyClass).getName()); + return new KafkaConsumer<>(properties); + } + + private static Class<? extends Deserializer<?>> deserializerForClass(Class<?> cls) { + if (cls == String.class || cls == CharSequence.class) { + return StringDeserializer.class; + } + if (cls == Double.class) { + return DoubleDeserializer.class; + } + if (cls == Integer.class) { + return IntegerDeserializer.class; + } + if (cls == Float.class) { + return FloatDeserializer.class; + } + if (cls == Long.class) { + return LongDeserializer.class; + } + if (cls == Short.class) { + return ShortDeserializer.class; + } + if (cls == Bytes.class) { + return BytesDeserializer.class; + } + if (cls == ByteBuffer.class) { + return ByteBufferDeserializer.class; + } + if (cls == byte[].class || cls == Byte[].class || cls == Object.class) { + return ByteArrayDeserializer.class; + } + // fall back to doing nothing + return ByteArrayDeserializer.class; + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java new file mode 100644 index 0000000..f6cd664 --- /dev/null +++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.kafka; + +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.QueryPostprocessDataContext; +import org.apache.metamodel.data.DataSet; +import org.apache.metamodel.data.MaxRowsDataSet; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.ColumnType; +import org.apache.metamodel.schema.ColumnTypeImpl; +import org.apache.metamodel.schema.MutableColumn; +import org.apache.metamodel.schema.MutableSchema; +import org.apache.metamodel.schema.MutableTable; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; + +public class KafkaDataContext<K, V> extends QueryPostprocessDataContext { + + public static final String SYSTEM_PROPERTY_CONSUMER_POLL_TIMEOUT = "metamodel.kafka.consumer.poll.timeout"; + + public static final String COLUMN_PARTITION = "partition"; + public static final String COLUMN_OFFSET = "offset"; + public static final String COLUMN_TIMESTAMP = "timestamp"; + public static final String COLUMN_KEY = "key"; + public static final String COLUMN_VALUE = "value"; + + private final Class<K> keyClass; + private final Class<V> valueClass; + private final ConsumerFactory consumerFactory; + private final Supplier<Collection<String>> topicSupplier; + + public KafkaDataContext(Class<K> keyClass, Class<V> valueClass, String bootstrapServers, + Collection<String> topics) { + this(keyClass, valueClass, new KafkaConsumerFactory(bootstrapServers), () -> topics); + } + + public KafkaDataContext(Class<K> keyClass, Class<V> valueClass, ConsumerFactory consumerFactory, + Supplier<Collection<String>> topicSupplier) { + this.keyClass = keyClass; + this.valueClass = valueClass; + this.consumerFactory = consumerFactory; + this.topicSupplier = topicSupplier; + } + + @Override + protected Schema getMainSchema() throws MetaModelException { + final MutableSchema schema = new MutableSchema(getMainSchemaName()); + + final Collection<String> topics = topicSupplier.get(); + + for (String topic : topics) { + final MutableTable table = new MutableTable(topic, schema); + table.addColumn(new MutableColumn(COLUMN_PARTITION, ColumnType.INTEGER)); + table.addColumn(new MutableColumn(COLUMN_OFFSET, ColumnType.BIGINT)); + table.addColumn(new MutableColumn(COLUMN_TIMESTAMP, ColumnType.TIMESTAMP)); + table.addColumn(new MutableColumn(COLUMN_KEY, ColumnTypeImpl.convertColumnType(keyClass))); + table.addColumn(new MutableColumn(COLUMN_VALUE, ColumnTypeImpl.convertColumnType(valueClass))); + schema.addTable(table); + } + return schema; + } + + @Override + protected String getMainSchemaName() throws MetaModelException { + return "kafka"; + } + + @Override + protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) { + final String topic = table.getName(); + final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass); + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); + List<TopicPartition> partitions = partitionInfos.stream().map(partitionInfo -> { + return new TopicPartition(topic, partitionInfo.partition()); + }).collect(Collectors.toList()); + + consumer.seekToBeginning(partitions); + consumer.assign(partitions); + + if (maxRows > 0) { + return new MaxRowsDataSet(new KafkaDataSet<K, V>(consumer, columns), maxRows); + } + return new KafkaDataSet<K, V>(consumer, columns); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java new file mode 100644 index 0000000..90dcf38 --- /dev/null +++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.kafka; + +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.metamodel.data.AbstractDataSet; +import org.apache.metamodel.data.CachingDataSetHeader; +import org.apache.metamodel.data.DefaultRow; +import org.apache.metamodel.data.Row; +import org.apache.metamodel.query.SelectItem; +import org.apache.metamodel.schema.Column; + +final class KafkaDataSet<K, V> extends AbstractDataSet { + + private final Consumer<K, V> consumer; + private final long pollTimeout; + + private Iterator<ConsumerRecord<K, V>> currentIterator; + private ConsumerRecord<K, V> currentRow; + + public KafkaDataSet(Consumer<K, V> consumer, List<Column> columns) { + super(new CachingDataSetHeader(columns.stream().map(col -> new SelectItem(col)).collect(Collectors.toList()))); + this.consumer = consumer; + this.pollTimeout = Long.parseLong(System.getProperty(KafkaDataContext.SYSTEM_PROPERTY_CONSUMER_POLL_TIMEOUT, + "1000")); + } + + @Override + public boolean next() { + if (currentIterator == null || !currentIterator.hasNext()) { + final ConsumerRecords<K, V> records = consumer.poll(pollTimeout); + if (records == null || records.isEmpty()) { + return false; + } + currentIterator = records.iterator(); + } + + this.currentRow = currentIterator.next(); + if (currentRow == null) { + return false; + } + return true; + } + + @Override + public Row getRow() { + final Object[] values = getHeader().getSelectItems().stream().map(this::getValue).toArray(); + + return new DefaultRow(getHeader(), values); + } + + private Object getValue(SelectItem selectItem) { + if (currentRow == null) { + return null; + } + switch (selectItem.getColumn().getName()) { + case KafkaDataContext.COLUMN_PARTITION: + return currentRow.partition(); + case KafkaDataContext.COLUMN_OFFSET: + return currentRow.offset(); + case KafkaDataContext.COLUMN_TIMESTAMP: + return currentRow.timestamp(); + case KafkaDataContext.COLUMN_KEY: + return currentRow.key(); + case KafkaDataContext.COLUMN_VALUE: + return currentRow.value(); + } + return null; + } + + @Override + public void close() { + super.close(); + currentRow = null; + consumer.unsubscribe(); + consumer.close(); + } +}
