This is an automated email from the ASF dual-hosted git repository.
sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new ca6dc99 [CALCITE-2913] Adapter for Apache Kafka (Mingmin Xu) Expose
an Apache Kafka topic as a stream table.
ca6dc99 is described below
commit ca6dc99a4625b9d2708f73b523da3dd41d71f73b
Author: mingmxu <[email protected]>
AuthorDate: Fri May 24 09:54:11 2019 -0700
[CALCITE-2913] Adapter for Apache Kafka (Mingmin Xu)
Expose an Apache Kafka topic as a stream table.
---
kafka/pom.xml | 99 ++++++++++++++++++
.../adapter/kafka/KafkaMessageEnumerator.java | 91 ++++++++++++++++
.../calcite/adapter/kafka/KafkaRowConverter.java | 51 +++++++++
.../adapter/kafka/KafkaRowConverterImpl.java | 68 ++++++++++++
.../calcite/adapter/kafka/KafkaStreamTable.java | 115 +++++++++++++++++++++
.../calcite/adapter/kafka/KafkaTableConstants.java | 29 ++++++
.../calcite/adapter/kafka/KafkaTableFactory.java | 101 ++++++++++++++++++
.../calcite/adapter/kafka/KafkaTableOptions.java | 80 ++++++++++++++
.../apache/calcite/adapter/kafka/package-info.java | 28 +++++
.../calcite/adapter/kafka/KafkaAdapterTest.java | 112 ++++++++++++++++++++
.../calcite/adapter/kafka/KafkaMockConsumer.java | 48 +++++++++
.../adapter/kafka/KafkaRowConverterTest.java | 63 +++++++++++
kafka/src/test/resources/kafka.model.json | 48 +++++++++
pom.xml | 8 ++
site/_docs/adapter.md | 1 +
site/_docs/kafka_adapter.md | 103 ++++++++++++++++++
sqlline | 2 +-
17 files changed, 1046 insertions(+), 1 deletion(-)
diff --git a/kafka/pom.xml b/kafka/pom.xml
new file mode 100644
index 0000000..98658b7
--- /dev/null
+++ b/kafka/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite</artifactId>
+ <version>1.20.0-SNAPSHOT</version>
+ </parent>
+
+ <!-- The basics. -->
+ <artifactId>calcite-kafka</artifactId>
+ <packaging>jar</packaging>
+ <name>calcite kafka</name>
+ <description>Kafka Adapter. Exposes kafka topic(s) as stream
table(s).</description>
+
+ <properties>
+ <top.dir>${project.basedir}/..</top.dir>
+ <build.timestamp>${maven.build.timestamp}</build.timestamp>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-compiler</artifactId>
+ <groupId>org.codehaus.janino</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-linq4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>${maven-dependency-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+
<outputDirectory>${project.build.directory}/dependencies/</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaMessageEnumerator.java
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaMessageEnumerator.java
new file mode 100644
index 0000000..fb9fb03
--- /dev/null
+++
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaMessageEnumerator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.linq4j.Enumerator;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Enumerator to read data from {@link Consumer},
+ * and converted into SQL rows with {@link KafkaRowConverter}.
+ * @param <K>: type for Kafka message key,
+ * refer to {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG};
+ * @param <V>: type for Kafka message value,
+ * refer to {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG};
+ */
+public class KafkaMessageEnumerator<K, V> implements Enumerator<Object[]> {
+ final Consumer consumer;
+ final KafkaRowConverter<K, V> rowConverter;
+ private final AtomicBoolean cancelFlag;
+
+ //runtime
+ private final LinkedList<ConsumerRecord<K, V>> bufferedRecords = new
LinkedList<>();
+ private ConsumerRecord<K, V> curRecord;
+
+ KafkaMessageEnumerator(final Consumer consumer,
+ final KafkaRowConverter<K, V> rowConverter,
+ final AtomicBoolean cancelFlag) {
+ this.consumer = consumer;
+ this.rowConverter = rowConverter;
+ this.cancelFlag = cancelFlag;
+ }
+
+ /**
+ * It returns an Array of Object, with each element represents a field of
row.
+ */
+ @Override public Object[] current() {
+ return rowConverter.toRow(curRecord);
+ }
+
+ @Override public boolean moveNext() {
+ if (cancelFlag.get()) {
+ return false;
+ }
+
+ while (bufferedRecords.isEmpty()) {
+ pullRecords();
+ }
+
+ curRecord = bufferedRecords.removeFirst();
+ return true;
+ }
+
+ private void pullRecords() {
+ ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord record : records) {
+ bufferedRecords.add(record);
+ }
+ }
+
+ @Override public void reset() {
+ this.bufferedRecords.clear();
+ pullRecords();
+ }
+
+ @Override public void close() {
+ consumer.close();
+ }
+}
+// End KafkaMessageEnumerator.java
diff --git
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverter.java
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverter.java
new file mode 100644
index 0000000..0f35cdf
--- /dev/null
+++
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.rel.type.RelDataType;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Interface to handle formatting between Kafka message and Calcite row.
+ *
+ * @param <K>: type for Kafka message key,
+ * refer to {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG};
+ * @param <V>: type for Kafka message value,
+ * refer to {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG};
+ *
+ */
+public interface KafkaRowConverter<K, V> {
+
+ /**
+ * Generate row type for a given Kafka topic.
+ *
+ * @param topicName, Kafka topic name;
+ * @return row type
+ */
+ RelDataType rowDataType(String topicName);
+
+ /**
+ * Parse and reformat Kafka message from consumer,
+ * to align with row type defined as {@link #rowDataType(String)}.
+ * @param message, the raw Kafka message record;
+ * @return fields in the row
+ */
+ Object[] toRow(ConsumerRecord<K, V> message);
+}
+// End KafkaRowConverter.java
diff --git
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.java
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.java
new file mode 100644
index 0000000..a659e73
--- /dev/null
+++
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Default implementation of {@link KafkaRowConverter}, both key and value are
byte[].
+ */
+public class KafkaRowConverterImpl implements KafkaRowConverter<byte[],
byte[]> {
+ /**
+ * Generate row schema for a given Kafka topic.
+ *
+ * @param topicName, Kafka topic name;
+ * @return row type
+ */
+ @Override public RelDataType rowDataType(final String topicName) {
+ final RelDataTypeFactory typeFactory =
+ new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
+ fieldInfo.add("MSG_PARTITION",
typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false);
+ fieldInfo.add("MSG_TIMESTAMP",
typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
+ fieldInfo.add("MSG_OFFSET",
typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
+ fieldInfo.add("MSG_KEY_BYTES",
typeFactory.createSqlType(SqlTypeName.VARBINARY)).nullable(true);
+ fieldInfo.add("MSG_VALUE_BYTES",
typeFactory.createSqlType(SqlTypeName.VARBINARY))
+ .nullable(false);
+
+ return fieldInfo.build();
+ }
+
+ /**
+ * Parse and reformat Kafka message from consumer, to align with row schema
+ * defined as {@link #rowDataType(String)}.
+ * @param message, the raw Kafka message record;
+ * @return fields in the row
+ */
+ @Override public Object[] toRow(final ConsumerRecord<byte[], byte[]>
message) {
+ Object[] fields = new Object[5];
+ fields[0] = message.partition();
+ fields[1] = message.timestamp();
+ fields[2] = message.offset();
+ fields[3] = message.key();
+ fields[4] = message.value();
+
+ return fields;
+ }
+}
+// End KafkaRowConverterImpl.java
diff --git
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaStreamTable.java
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaStreamTable.java
new file mode 100644
index 0000000..ddacfc1
--- /dev/null
+++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaStreamTable.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A table which maps to an Apache Kafka topic, currently only {@link
KafkaStreamTable} is
+ * implemented as a STREAM table.
+ */
+public class KafkaStreamTable implements ScannableTable, StreamableTable {
+ final KafkaTableOptions tableOptions;
+
+ KafkaStreamTable(final KafkaTableOptions tableOptions) {
+ this.tableOptions = tableOptions;
+ }
+
+ @Override public Enumerable<Object[]> scan(final DataContext root) {
+ final AtomicBoolean cancelFlag =
DataContext.Variable.CANCEL_FLAG.get(root);
+ return new AbstractEnumerable<Object[]>() {
+ public Enumerator<Object[]> enumerator() {
+ if (tableOptions.getConsumer() != null) {
+ return new KafkaMessageEnumerator(tableOptions.getConsumer(),
+ tableOptions.getRowConverter(), cancelFlag);
+ }
+
+ Properties consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ tableOptions.getBootstrapServers());
+ //by default it's <byte[], byte[]>
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+ if (tableOptions.getConsumerParams() != null) {
+ consumerConfig.putAll(tableOptions.getConsumerParams());
+ }
+ Consumer consumer = new KafkaConsumer<>(consumerConfig);
+
consumer.subscribe(Collections.singletonList(tableOptions.getTopicName()));
+
+ return new KafkaMessageEnumerator(consumer,
tableOptions.getRowConverter(), cancelFlag);
+ }
+ };
+ }
+
+ @Override public RelDataType getRowType(final RelDataTypeFactory
typeFactory) {
+ return
tableOptions.getRowConverter().rowDataType(tableOptions.getTopicName());
+ }
+
+ /** Returns a provider of statistics about this table. */
+ @Override public Statistic getStatistic() {
+ return Statistics.of(100d, ImmutableList.of(),
+ RelCollations.createSingleton(0));
+ }
+
+ @Override public boolean isRolledUp(final String column) {
+ return false;
+ }
+
+ @Override public boolean rolledUpColumnValidInsideAgg(final String column,
final SqlCall call,
+ final SqlNode parent,
+ final CalciteConnectionConfig config) {
+ return false;
+ }
+
+ @Override public Table stream() {
+ return this;
+ }
+
+ /** Type of table. */
+ @Override public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.STREAM;
+ }
+}
+// End KafkaStreamTable.java
diff --git
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableConstants.java
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableConstants.java
new file mode 100644
index 0000000..aa0582f
--- /dev/null
+++
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+/**
+ * Parameter constants used to define a Kafka table.
+ */
+interface KafkaTableConstants {
+ String SCHEMA_TOPIC_NAME = "topic.name";
+ String SCHEMA_BOOTSTRAP_SERVERS = "bootstrap.servers";
+ String SCHEMA_ROW_CONVERTER = "row.converter";
+ String SCHEMA_CUST_CONSUMER = "consumer.cust";
+ String SCHEMA_CONSUMER_PARAMS = "consumer.params";
+}
+// End KafkaTableConstants.java
diff --git
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableFactory.java
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableFactory.java
new file mode 100644
index 0000000..1f1252d
--- /dev/null
+++
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TableFactory;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * Implementation of {@link TableFactory} for Apache Kafka. Currently an
Apache Kafka
+ * topic is mapping to a STREAM table.
+ */
+public class KafkaTableFactory implements TableFactory<KafkaStreamTable> {
+ public KafkaTableFactory() {
+ }
+
+ /** Creates a Table.
+ * @param schema Schema this table belongs to
+ * @param name Name of this table
+ * @param operand The "operand" JSON property
+ * @param rowType Row type. Specified if the "columns" JSON property.
+ */
+ @Override public KafkaStreamTable create(SchemaPlus schema,
+ String name,
+ Map<String, Object> operand,
+ RelDataType rowType) {
+ KafkaTableOptions tableOptionBuilder = new KafkaTableOptions();
+
+ tableOptionBuilder.setBootstrapServers(
+ (String)
operand.getOrDefault(KafkaTableConstants.SCHEMA_BOOTSTRAP_SERVERS, null));
+ tableOptionBuilder.setTopicName(
+ (String) operand.getOrDefault(KafkaTableConstants.SCHEMA_TOPIC_NAME,
null));
+
+ KafkaRowConverter rowConverter = null;
+ if (operand.containsKey(KafkaTableConstants.SCHEMA_ROW_CONVERTER)) {
+ String rowConverterClass = (String)
operand.get(KafkaTableConstants.SCHEMA_ROW_CONVERTER);
+ try {
+ rowConverter = (KafkaRowConverter)
Class.forName(rowConverterClass).newInstance();
+ } catch (InstantiationException | IllegalAccessException |
ClassNotFoundException e) {
+ final String details = String.format(
+ Locale.ROOT,
+ "Fail to create table '%s' with configuration: \n"
+ + "'%s'\n"
+ + "KafkaRowConverter '%s' is invalid",
+ name, operand, rowConverterClass);
+ throw new RuntimeException(details, e);
+ }
+ } else {
+ rowConverter = new KafkaRowConverterImpl();
+ }
+ tableOptionBuilder.setRowConverter(rowConverter);
+
+ if (operand.containsKey(KafkaTableConstants.SCHEMA_CONSUMER_PARAMS)) {
+ tableOptionBuilder.setConsumerParams((Map<String, String>) operand.get(
+ KafkaTableConstants.SCHEMA_CONSUMER_PARAMS));
+ }
+ if (operand.containsKey(KafkaTableConstants.SCHEMA_CUST_CONSUMER)) {
+ String custConsumerClass = (String)
operand.get(KafkaTableConstants.SCHEMA_CUST_CONSUMER);
+ try {
+ tableOptionBuilder.setConsumer(
+ (Consumer) Class.forName(custConsumerClass)
+ .getConstructor(OffsetResetStrategy.class)
+ .newInstance(OffsetResetStrategy.NONE)
+ );
+ } catch (ClassNotFoundException | NoSuchMethodException |
IllegalAccessException
+ | InstantiationException | InvocationTargetException e) {
+ final String details = String.format(
+ Locale.ROOT,
+ "Fail to create table '%s' with configuration: \n"
+ + "'%s'\n"
+ + "KafkaCustConsumer '%s' is invalid",
+ name, operand, custConsumerClass);
+ throw new RuntimeException(details, e);
+ }
+ }
+
+ return new KafkaStreamTable(tableOptionBuilder);
+ }
+}
+// End KafkaTableFactory.java
diff --git
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableOptions.java
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableOptions.java
new file mode 100644
index 0000000..230689e
--- /dev/null
+++
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableOptions.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+
+import java.util.Map;
+
+/**
+ * Available options for {@link KafkaStreamTable}.
+ */
+public final class KafkaTableOptions {
+ private String bootstrapServers;
+ private String topicName;
+ private KafkaRowConverter rowConverter;
+ private Map<String, String> consumerParams;
+ //added to inject MockConsumer for testing.
+ private Consumer consumer;
+
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public KafkaTableOptions setBootstrapServers(final String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ return this;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public KafkaTableOptions setTopicName(final String topicName) {
+ this.topicName = topicName;
+ return this;
+ }
+
+ public KafkaRowConverter getRowConverter() {
+ return rowConverter;
+ }
+
+ public KafkaTableOptions setRowConverter(
+ final KafkaRowConverter rowConverter) {
+ this.rowConverter = rowConverter;
+ return this;
+ }
+
+ public Map<String, String> getConsumerParams() {
+ return consumerParams;
+ }
+
+ public KafkaTableOptions setConsumerParams(final Map<String, String>
consumerParams) {
+ this.consumerParams = consumerParams;
+ return this;
+ }
+
+ public Consumer getConsumer() {
+ return consumer;
+ }
+
+ public KafkaTableOptions setConsumer(final Consumer consumer) {
+ this.consumer = consumer;
+ return this;
+ }
+}
+// End KafkaTableOptions.java
diff --git
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/package-info.java
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/package-info.java
new file mode 100644
index 0000000..a303c70
--- /dev/null
+++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Kafka query provider.
+ *
+ * <p>One Kafka topic is mapping to one STREAM table.</p>
+ */
+@PackageMarker
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java
diff --git
a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java
b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java
new file mode 100644
index 0000000..79709cc
--- /dev/null
+++ b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.test.CalciteAssert;
+
+import com.google.common.io.Resources;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+/**
+ * Unit test cases for Kafka adapter.
+ */
+public class KafkaAdapterTest {
+ protected static final URL MODEL =
KafkaAdapterTest.class.getResource("/kafka.model.json");
+
+ private CalciteAssert.AssertThat assertModel(String model) {
+ // ensure that Schema from this instance is being used
+ model = model.replace(KafkaAdapterTest.class.getName(),
KafkaAdapterTest.class.getName());
+
+ return CalciteAssert.that()
+ .withModel(model);
+ }
+
+ private CalciteAssert.AssertThat assertModel(URL url) {
+ Objects.requireNonNull(url, "url");
+ try {
+ return assertModel(Resources.toString(url, StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Test public void testSelect() {
+ assertModel(MODEL)
+ .query("SELECT STREAM * FROM KAFKA.MOCKTABLE")
+ .limit(2)
+
+ .typeIs("[MSG_PARTITION INTEGER NOT NULL"
+ + ", MSG_TIMESTAMP BIGINT NOT NULL"
+ + ", MSG_OFFSET BIGINT NOT NULL"
+ + ", MSG_KEY_BYTES VARBINARY"
+ + ", MSG_VALUE_BYTES VARBINARY NOT NULL]")
+
+ .returnsUnordered(
+ "MSG_PARTITION=0; MSG_TIMESTAMP=-1; MSG_OFFSET=0;
MSG_KEY_BYTES=mykey0; MSG_VALUE_BYTES=myvalue0",
+ "MSG_PARTITION=0; MSG_TIMESTAMP=-1; MSG_OFFSET=1"
+ + "; MSG_KEY_BYTES=mykey1; MSG_VALUE_BYTES=myvalue1")
+
+ .explainContains("PLAN=EnumerableInterpreter\n"
+ + " BindableTableScan(table=[[KAFKA, MOCKTABLE, (STREAM)]])\n");
+ }
+
+ @Test public void testFilterWithProject() {
+ assertModel(MODEL)
+ .query("SELECT STREAM MSG_PARTITION,MSG_OFFSET,MSG_VALUE_BYTES FROM
KAFKA.MOCKTABLE"
+ + " WHERE MSG_OFFSET>0")
+ .limit(1)
+
+ .returnsUnordered(
+ "MSG_PARTITION=0; MSG_OFFSET=1; MSG_VALUE_BYTES=myvalue1")
+ .explainContains(
+ "PLAN=EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0],
expr#6=[>($t2, $t5)], MSG_PARTITION=[$t0], MSG_OFFSET=[$t2],
MSG_VALUE_BYTES=[$t4], $condition=[$t6])\n"
+ + " EnumerableInterpreter\n"
+ + " BindableTableScan(table=[[KAFKA, MOCKTABLE,
(STREAM)]])");
+ }
+
+ @Test public void testCustRowConverter() {
+ assertModel(MODEL)
+ .query("SELECT STREAM * FROM KAFKA.MOCKTABLE_CUST_ROW_CONVERTER")
+ .limit(2)
+
+ .typeIs("[TOPIC_NAME VARCHAR NOT NULL"
+ + ", PARTITION_ID INTEGER NOT NULL"
+ + ", TIMESTAMP_TYPE VARCHAR]")
+
+ .returnsUnordered(
+ "TOPIC_NAME=testtopic; PARTITION_ID=0;
TIMESTAMP_TYPE=NoTimestampType",
+ "TOPIC_NAME=testtopic; PARTITION_ID=0;
TIMESTAMP_TYPE=NoTimestampType")
+
+ .explainContains("PLAN=EnumerableInterpreter\n"
+ + " BindableTableScan(table=[[KAFKA,
MOCKTABLE_CUST_ROW_CONVERTER, (STREAM)]])\n");
+ }
+
+
+ @Test public void testAsBatch() {
+ assertModel(MODEL)
+ .query("SELECT * FROM KAFKA.MOCKTABLE")
+ .failsAtValidation("Cannot convert stream 'MOCKTABLE' to relation");
+ }
+}
+// End KafkaAdapterTest.java
diff --git
a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaMockConsumer.java
b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaMockConsumer.java
new file mode 100644
index 0000000..cccbcfd
--- /dev/null
+++
b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaMockConsumer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+
+/**
+ * A mock consumer to test Kafka adapter.
+ */
+public class KafkaMockConsumer extends MockConsumer {
+ public KafkaMockConsumer(final OffsetResetStrategy offsetResetStrategy) {
+ super(OffsetResetStrategy.EARLIEST);
+
+ assign(Arrays.asList(new TopicPartition("testtopic", 0)));
+
+ HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+ beginningOffsets.put(new TopicPartition("testtopic", 0), 0L);
+ updateBeginningOffsets(beginningOffsets);
+
+ for (int idx = 0; idx < 10; ++idx) {
+ addRecord(new ConsumerRecord<byte[], byte[]>("testtopic",
+ 0, idx, ("mykey" + idx).getBytes(StandardCharsets.UTF_8),
+ ("myvalue" + idx).getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+}
+// End KafkaMockConsumer.java
diff --git
a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaRowConverterTest.java
b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaRowConverterTest.java
new file mode 100644
index 0000000..16dc3cd
--- /dev/null
+++
b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaRowConverterTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Implementation of {@link KafkaRowConverter} for test, both key and value
are saved as byte[].
+ */
+public class KafkaRowConverterTest implements KafkaRowConverter<String,
String> {
+ /**
+ * Generate row schema for a given Kafka topic.
+ *
+ * @param topicName, Kafka topic name;
+ * @return row type
+ */
+ @Override public RelDataType rowDataType(final String topicName) {
+ final RelDataTypeFactory typeFactory =
+ new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
+ fieldInfo.add("TOPIC_NAME",
typeFactory.createSqlType(SqlTypeName.VARCHAR)).nullable(false);
+ fieldInfo.add("PARTITION_ID",
typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false);
+ fieldInfo.add("TIMESTAMP_TYPE",
typeFactory.createSqlType(SqlTypeName.VARCHAR)).nullable(true);
+
+ return fieldInfo.build();
+ }
+
+ /**
+ * Parse and reformat Kafka message from consumer, to fit with row schema
+ * defined as {@link #rowDataType(String)}.
+ * @param message, the raw Kafka message record;
+ * @return fields in the row
+ */
+ @Override public Object[] toRow(final ConsumerRecord<String, String>
message) {
+ Object[] fields = new Object[3];
+ fields[0] = message.topic();
+ fields[1] = message.partition();
+ fields[2] = message.timestampType().name;
+
+ return fields;
+ }
+}
+// End KafkaRowConverterTest.java
diff --git a/kafka/src/test/resources/kafka.model.json
b/kafka/src/test/resources/kafka.model.json
new file mode 100644
index 0000000..a37d499
--- /dev/null
+++ b/kafka/src/test/resources/kafka.model.json
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "version": "1.0",
+ "defaultSchema": "KAFKA",
+ "schemas": [
+ {
+ "name": "KAFKA",
+ "tables": [
+ {
+ "name": "MOCKTABLE",
+ "type": "custom",
+ "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory",
+ "operand": {
+ "consumer.cust":
"org.apache.calcite.adapter.kafka.KafkaMockConsumer"
+ }
+ }
+ ,{
+ "name": "MOCKTABLE_CUST_ROW_CONVERTER",
+ "type": "custom",
+ "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory",
+ "operand": {
+ "consumer.cust":
"org.apache.calcite.adapter.kafka.KafkaMockConsumer",
+ "row.converter":
"org.apache.calcite.adapter.kafka.KafkaRowConverterTest",
+ "consumer.params": {
+ "key.deserializer":
"org.apache.kafka.common.serialization.StringDeserializer",
+ "value.deserializer":
"org.apache.kafka.common.serialization.StringDeserializer"
+ }
+ }
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 1b4825d..84f23df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,8 @@ limitations under the License.
<fmpp.version>0.9.16</fmpp.version>
<jsonpath.version>2.4.0</jsonpath.version>
+ <kafka.client.version>2.0.0</kafka.client.version>
+
<!-- Other properties (not version numbers) -->
<!-- Java 1.8 does not support -html5, so override via profile for Java
1.8 -->
<maven-javadoc-html5>-html5</maven-javadoc-html5>
@@ -189,6 +191,7 @@ limitations under the License.
<module>spark</module>
<module>splunk</module>
<module>ubenchmark</module>
+ <module>kafka</module>
</modules>
<!-- No dependencies here. Declare dependency VERSIONS in
@@ -608,6 +611,11 @@ limitations under the License.
<version>${jcip-annotations.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.client.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/site/_docs/adapter.md b/site/_docs/adapter.md
index b1c9dec..a857e91 100644
--- a/site/_docs/adapter.md
+++ b/site/_docs/adapter.md
@@ -42,6 +42,7 @@ presenting the data as tables within a schema.
* Spark adapter (<a href="{{ site.apiRoot
}}/org/apache/calcite/adapter/spark/package-summary.html">calcite-spark</a>)
* Splunk adapter (<a href="{{ site.apiRoot
}}/org/apache/calcite/adapter/splunk/package-summary.html">calcite-splunk</a>)
* Eclipse Memory Analyzer (MAT) adapter (<a
href="https://github.com/vlsi/mat-calcite-plugin">mat-calcite-plugin</a>)
+* [Apache Kafka adapter](kafka_adapter.html)
### Other language interfaces
diff --git a/site/_docs/kafka_adapter.md b/site/_docs/kafka_adapter.md
new file mode 100644
index 0000000..000a6d4
--- /dev/null
+++ b/site/_docs/kafka_adapter.md
@@ -0,0 +1,103 @@
+---
+layout: docs
+title: Kafka adapter
+permalink: /docs/kafka_adapter.html
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+**Note**:
+
+KafkaAdapter is an experimental feature, changes in public API and usage are
expected.
+
+For instructions on downloading and building Calcite, start with
the[tutorial]({{ site.baseurl }}/docs/tutorial.html).
+
+The Kafka adapter exposes an Apache Kafka topic as a STREAM table, so it can
be queried using
+[Calcite Stream SQL]({{ site.baseurl }}/docs/stream.html). Note that the
adapter will not attempt to scan all topics,
+instead users need to configure tables manually, one Kafka stream table is
mapping to one Kafka topic.
+
+A basic example of a model file is given below:
+
+{% highlight json %}
+{
+ "version": "1.0",
+ "defaultSchema": "KAFKA",
+ "schemas": [
+ {
+ "name": "KAFKA",
+ "tables": [
+ {
+ "name": "TABLE_NAME",
+ "type": "custom",
+ "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory",
+ "row.converter": "com.example.CustKafkaRowConverter",
+ "operand": {
+ "bootstrap.servers": "host1:port,host2:port",
+ "topic.name": "kafka.topic.name",
+ "consumer.params": {
+ "key.deserializer":
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+ "value.deserializer":
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+ }
+ }
+ }
+ ]
+ }
+ ]
+}
+{% endhighlight %}
+
+Note that:
+
+1. As Kafka message is schemaless, a [KafkaRowConverter]({{ site.apiRoot
}}/org/apache/calcite/adapter/kafka/KafkaRowConverter.html)
+ is required to specify row schema explicitly(with parameter `row.converter`),
and
+ how to decode Kafka message to Calcite row. [KafkaRowConverterImpl]({{
site.apiRoot }}/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.html)
+ is used if not provided;
+
+2. More consumer settings can be added in parameter `consumer.params`;
+
+Assuming this file is stored as `kafka.model.json`, you can connect to Kafka
via
+[`sqlline`](https://github.com/julianhyde/sqlline) as follows:
+
+{% highlight bash %}
+$ ./sqlline
+sqlline> !connect jdbc:calcite:model=kafka.model.json admin admin
+{% endhighlight %}
+
+`sqlline` will now accept SQL queries which access your Kafka topics.
+
+With the Kafka table configured in above model. We can run a simple query to
fetch messages:
+
+{% highlight sql %}
+sqlline> SELECT STREAM *
+ FROM KAFKA.TABLE_NAME;
++---------------+---------------------+---------------------+---------------+-----------------+
+| MSG_PARTITION | MSG_TIMESTAMP | MSG_OFFSET | MSG_KEY_BYTES |
MSG_VALUE_BYTES |
++---------------+---------------------+---------------------+---------------+-----------------+
+| 0 | -1 | 0 | mykey0 |
myvalue0 |
+| 0 | -1 | 1 | mykey1 |
myvalue1 |
++---------------+---------------------+---------------------+---------------+-----------------+
+{% endhighlight %}
+
+Kafka table is a streaming table, which runs continuously, `LIMIT` can be
added to return fast as below:
+
+{% highlight sql %}
+sqlline> SELECT STREAM *
+ FROM KAFKA.TABLE_NAME
+ LIMIT 5;
+{% endhighlight %}
diff --git a/sqlline b/sqlline
index fc60c90..0dc6a76 100755
--- a/sqlline
+++ b/sqlline
@@ -37,7 +37,7 @@ if [ ! -f target/fullclasspath.txt ]; then
fi
CP=
-for module in core cassandra druid elasticsearch file mongodb server spark
splunk geode example/csv example/function; do
+for module in core cassandra druid elasticsearch file kafka mongodb server
spark splunk geode example/csv example/function; do
CP=${CP}${module}/target/classes:
CP=${CP}${module}/target/test-classes:
done