Repository: metamodel
Updated Branches:
  refs/heads/master 0f7a09296 -> a868f71c0


Initial draft at a Kafka module for Apache MetaModel

Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/5b8176e1
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/5b8176e1
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/5b8176e1

Branch: refs/heads/master
Commit: 5b8176e18bd77e15317c6d7f7c22f7916a2db7c6
Parents: fe02a59
Author: Kasper Sørensen <[email protected]>
Authored: Sat Jan 27 17:45:32 2018 -0800
Committer: Kasper Sørensen <[email protected]>
Committed: Sat Jan 27 17:45:32 2018 -0800

----------------------------------------------------------------------
 .../org/apache/metamodel/MetaModelHelper.java   |   2 +-
 .../apache/metamodel/data/AbstractDataSet.java  |   1 -
 kafka/pom.xml                                   |  46 ++++++++
 .../apache/metamodel/kafka/ConsumerFactory.java |  32 ++++++
 .../metamodel/kafka/KafkaConsumerFactory.java   |  97 ++++++++++++++++
 .../metamodel/kafka/KafkaDataContext.java       | 111 +++++++++++++++++++
 .../apache/metamodel/kafka/KafkaDataSet.java    | 100 +++++++++++++++++
 7 files changed, 387 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/core/src/main/java/org/apache/metamodel/MetaModelHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/metamodel/MetaModelHelper.java 
b/core/src/main/java/org/apache/metamodel/MetaModelHelper.java
index 59900f9..c484736 100644
--- a/core/src/main/java/org/apache/metamodel/MetaModelHelper.java
+++ b/core/src/main/java/org/apache/metamodel/MetaModelHelper.java
@@ -769,7 +769,7 @@ public final class MetaModelHelper {
     }
 
     public static SelectItem[] createSelectItems(Column... columns) {
-        SelectItem[] items = new SelectItem[columns.length];
+        final SelectItem[] items = new SelectItem[columns.length];
         for (int i = 0; i < items.length; i++) {
             items[i] = new SelectItem(columns[i]);
         }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/core/src/main/java/org/apache/metamodel/data/AbstractDataSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/metamodel/data/AbstractDataSet.java 
b/core/src/main/java/org/apache/metamodel/data/AbstractDataSet.java
index 33238a0..d382dc6 100644
--- a/core/src/main/java/org/apache/metamodel/data/AbstractDataSet.java
+++ b/core/src/main/java/org/apache/metamodel/data/AbstractDataSet.java
@@ -62,7 +62,6 @@ public abstract class AbstractDataSet extends BaseObject 
implements DataSet {
         _header = Objects.requireNonNull(header);
     }
 
-
     /**
      * {@inheritDoc}
      */

http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/pom.xml b/kafka/pom.xml
new file mode 100644
index 0000000..0227218
--- /dev/null
+++ b/kafka/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+       license agreements. See the NOTICE file distributed with this work for 
additional 
+       information regarding copyright ownership. The ASF licenses this file 
to 
+       you under the Apache License, Version 2.0 (the "License"); you may not 
use 
+       this file except in compliance with the License. You may obtain a copy 
of 
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required 
+       by applicable law or agreed to in writing, software distributed under 
the 
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+       OF ANY KIND, either express or implied. See the License for the 
specific 
+       language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       <parent>
+               <artifactId>MetaModel</artifactId>
+               <groupId>org.apache.metamodel</groupId>
+               <version>5.1.0-SNAPSHOT</version>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+       <artifactId>MetaModel-kafka</artifactId>
+       <name>MetaModel module for Apache Kafka</name>
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.metamodel</groupId>
+                       <artifactId>MetaModel-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-clients</artifactId>
+                       <version>1.0.0</version>
+               </dependency>
+
+               <!-- Test dependencies -->
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-nop</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java 
b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java
new file mode 100644
index 0000000..efabf38
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+
+/**
+ * Factory interface for Kafka {@link Consumer} objects to be used by Apache
+ * MetaModel. Since Apache MetaModel may potentially serve multiple queries at
+ * the same times, multiple consumers may be needed. This class determines how
+ * these are instantiated.
+ */
+public interface ConsumerFactory {
+
+    public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> 
keyClass, Class<V> valueClass);
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java 
b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
new file mode 100644
index 0000000..6cfd101
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.FloatDeserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.ShortDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+
+/**
+ * Default {@link ConsumerFactory} implementation.
+ */
+public class KafkaConsumerFactory implements ConsumerFactory {
+
+    private final Properties baseProperties;
+
+    public KafkaConsumerFactory(String bootstrapServers) {
+        this.baseProperties = new Properties();
+        this.baseProperties.setProperty("bootstrap.servers", bootstrapServers);
+    }
+
+    public KafkaConsumerFactory(Properties baseProperties) {
+        this.baseProperties = baseProperties;
+    }
+
+    @Override
+    public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> 
keyClass, Class<V> valueClass) {
+        final String groupId = "apache_metamodel_" + topic + "_" + 
System.currentTimeMillis();
+
+        final Properties properties = new Properties(baseProperties);
+        properties.setProperty("group.id", groupId);
+        properties.setProperty("key.deserializer", 
deserializerForClass(keyClass).getName());
+        properties.setProperty("value.deserializer", 
deserializerForClass(keyClass).getName());
+        return new KafkaConsumer<>(properties);
+    }
+
+    private static Class<? extends Deserializer<?>> 
deserializerForClass(Class<?> cls) {
+        if (cls == String.class || cls == CharSequence.class) {
+            return StringDeserializer.class;
+        }
+        if (cls == Double.class) {
+            return DoubleDeserializer.class;
+        }
+        if (cls == Integer.class) {
+            return IntegerDeserializer.class;
+        }
+        if (cls == Float.class) {
+            return FloatDeserializer.class;
+        }
+        if (cls == Long.class) {
+            return LongDeserializer.class;
+        }
+        if (cls == Short.class) {
+            return ShortDeserializer.class;
+        }
+        if (cls == Bytes.class) {
+            return BytesDeserializer.class;
+        }
+        if (cls == ByteBuffer.class) {
+            return ByteBufferDeserializer.class;
+        }
+        if (cls == byte[].class || cls == Byte[].class || cls == Object.class) 
{
+            return ByteArrayDeserializer.class;
+        }
+        // fall back to doing nothing
+        return ByteArrayDeserializer.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java 
b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
new file mode 100644
index 0000000..f6cd664
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.QueryPostprocessDataContext;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.MaxRowsDataSet;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.ColumnTypeImpl;
+import org.apache.metamodel.schema.MutableColumn;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+
+public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
+
+    public static final String SYSTEM_PROPERTY_CONSUMER_POLL_TIMEOUT = 
"metamodel.kafka.consumer.poll.timeout";
+
+    public static final String COLUMN_PARTITION = "partition";
+    public static final String COLUMN_OFFSET = "offset";
+    public static final String COLUMN_TIMESTAMP = "timestamp";
+    public static final String COLUMN_KEY = "key";
+    public static final String COLUMN_VALUE = "value";
+
+    private final Class<K> keyClass;
+    private final Class<V> valueClass;
+    private final ConsumerFactory consumerFactory;
+    private final Supplier<Collection<String>> topicSupplier;
+
+    public KafkaDataContext(Class<K> keyClass, Class<V> valueClass, String 
bootstrapServers,
+            Collection<String> topics) {
+        this(keyClass, valueClass, new KafkaConsumerFactory(bootstrapServers), 
() -> topics);
+    }
+
+    public KafkaDataContext(Class<K> keyClass, Class<V> valueClass, 
ConsumerFactory consumerFactory,
+            Supplier<Collection<String>> topicSupplier) {
+        this.keyClass = keyClass;
+        this.valueClass = valueClass;
+        this.consumerFactory = consumerFactory;
+        this.topicSupplier = topicSupplier;
+    }
+
+    @Override
+    protected Schema getMainSchema() throws MetaModelException {
+        final MutableSchema schema = new MutableSchema(getMainSchemaName());
+
+        final Collection<String> topics = topicSupplier.get();
+
+        for (String topic : topics) {
+            final MutableTable table = new MutableTable(topic, schema);
+            table.addColumn(new MutableColumn(COLUMN_PARTITION, 
ColumnType.INTEGER));
+            table.addColumn(new MutableColumn(COLUMN_OFFSET, 
ColumnType.BIGINT));
+            table.addColumn(new MutableColumn(COLUMN_TIMESTAMP, 
ColumnType.TIMESTAMP));
+            table.addColumn(new MutableColumn(COLUMN_KEY, 
ColumnTypeImpl.convertColumnType(keyClass)));
+            table.addColumn(new MutableColumn(COLUMN_VALUE, 
ColumnTypeImpl.convertColumnType(valueClass)));
+            schema.addTable(table);
+        }
+        return schema;
+    }
+
+    @Override
+    protected String getMainSchemaName() throws MetaModelException {
+        return "kafka";
+    }
+
+    @Override
+    protected DataSet materializeMainSchemaTable(Table table, List<Column> 
columns, int maxRows) {
+        final String topic = table.getName();
+        final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, 
keyClass, valueClass);
+        final List<PartitionInfo> partitionInfos = 
consumer.partitionsFor(topic);
+        List<TopicPartition> partitions = 
partitionInfos.stream().map(partitionInfo -> {
+            return new TopicPartition(topic, partitionInfo.partition());
+        }).collect(Collectors.toList());
+
+        consumer.seekToBeginning(partitions);
+        consumer.assign(partitions);
+
+        if (maxRows > 0) {
+            return new MaxRowsDataSet(new KafkaDataSet<K, V>(consumer, 
columns), maxRows);
+        }
+        return new KafkaDataSet<K, V>(consumer, columns);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/5b8176e1/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java 
b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java
new file mode 100644
index 0000000..90dcf38
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.metamodel.data.AbstractDataSet;
+import org.apache.metamodel.data.CachingDataSetHeader;
+import org.apache.metamodel.data.DefaultRow;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.Column;
+
+final class KafkaDataSet<K, V> extends AbstractDataSet {
+
+    private final Consumer<K, V> consumer;
+    private final long pollTimeout;
+
+    private Iterator<ConsumerRecord<K, V>> currentIterator;
+    private ConsumerRecord<K, V> currentRow;
+
+    public KafkaDataSet(Consumer<K, V> consumer, List<Column> columns) {
+        super(new CachingDataSetHeader(columns.stream().map(col -> new 
SelectItem(col)).collect(Collectors.toList())));
+        this.consumer = consumer;
+        this.pollTimeout = 
Long.parseLong(System.getProperty(KafkaDataContext.SYSTEM_PROPERTY_CONSUMER_POLL_TIMEOUT,
+                "1000"));
+    }
+
+    @Override
+    public boolean next() {
+        if (currentIterator == null || !currentIterator.hasNext()) {
+            final ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
+            if (records == null || records.isEmpty()) {
+                return false;
+            }
+            currentIterator = records.iterator();
+        }
+
+        this.currentRow = currentIterator.next();
+        if (currentRow == null) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public Row getRow() {
+        final Object[] values = 
getHeader().getSelectItems().stream().map(this::getValue).toArray();
+
+        return new DefaultRow(getHeader(), values);
+    }
+
+    private Object getValue(SelectItem selectItem) {
+        if (currentRow == null) {
+            return null;
+        }
+        switch (selectItem.getColumn().getName()) {
+        case KafkaDataContext.COLUMN_PARTITION:
+            return currentRow.partition();
+        case KafkaDataContext.COLUMN_OFFSET:
+            return currentRow.offset();
+        case KafkaDataContext.COLUMN_TIMESTAMP:
+            return currentRow.timestamp();
+        case KafkaDataContext.COLUMN_KEY:
+            return currentRow.key();
+        case KafkaDataContext.COLUMN_VALUE:
+            return currentRow.value();
+        }
+        return null;
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        currentRow = null;
+        consumer.unsubscribe();
+        consumer.close();
+    }
+}

Reply via email to