This is an automated email from the ASF dual-hosted git repository.

sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new ca6dc99  [CALCITE-2913] Adapter for Apache Kafka (Mingmin Xu) Expose 
an Apache Kafka topic as a stream table.
ca6dc99 is described below

commit ca6dc99a4625b9d2708f73b523da3dd41d71f73b
Author: mingmxu <[email protected]>
AuthorDate: Fri May 24 09:54:11 2019 -0700

    [CALCITE-2913] Adapter for Apache Kafka (Mingmin Xu)
    Expose an Apache Kafka topic as a stream table.
---
 kafka/pom.xml                                      |  99 ++++++++++++++++++
 .../adapter/kafka/KafkaMessageEnumerator.java      |  91 ++++++++++++++++
 .../calcite/adapter/kafka/KafkaRowConverter.java   |  51 +++++++++
 .../adapter/kafka/KafkaRowConverterImpl.java       |  68 ++++++++++++
 .../calcite/adapter/kafka/KafkaStreamTable.java    | 115 +++++++++++++++++++++
 .../calcite/adapter/kafka/KafkaTableConstants.java |  29 ++++++
 .../calcite/adapter/kafka/KafkaTableFactory.java   | 101 ++++++++++++++++++
 .../calcite/adapter/kafka/KafkaTableOptions.java   |  80 ++++++++++++++
 .../apache/calcite/adapter/kafka/package-info.java |  28 +++++
 .../calcite/adapter/kafka/KafkaAdapterTest.java    | 112 ++++++++++++++++++++
 .../calcite/adapter/kafka/KafkaMockConsumer.java   |  48 +++++++++
 .../adapter/kafka/KafkaRowConverterTest.java       |  63 +++++++++++
 kafka/src/test/resources/kafka.model.json          |  48 +++++++++
 pom.xml                                            |   8 ++
 site/_docs/adapter.md                              |   1 +
 site/_docs/kafka_adapter.md                        | 103 ++++++++++++++++++
 sqlline                                            |   2 +-
 17 files changed, 1046 insertions(+), 1 deletion(-)

diff --git a/kafka/pom.xml b/kafka/pom.xml
new file mode 100644
index 0000000..98658b7
--- /dev/null
+++ b/kafka/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.calcite</groupId>
+    <artifactId>calcite</artifactId>
+    <version>1.20.0-SNAPSHOT</version>
+  </parent>
+
+  <!-- The basics. -->
+  <artifactId>calcite-kafka</artifactId>
+  <packaging>jar</packaging>
+  <name>calcite kafka</name>
+  <description>Kafka Adapter. Exposes kafka topic(s) as stream 
table(s).</description>
+
+  <properties>
+    <top.dir>${project.basedir}/..</top.dir>
+    <build.timestamp>${maven.build.timestamp}</build.timestamp>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>commons-compiler</artifactId>
+          <groupId>org.codehaus.janino</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-linq4j</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>${maven-dependency-plugin.version}</version>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              
<outputDirectory>${project.build.directory}/dependencies/</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>false</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git 
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaMessageEnumerator.java
 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaMessageEnumerator.java
new file mode 100644
index 0000000..fb9fb03
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaMessageEnumerator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.linq4j.Enumerator;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Enumerator to read data from {@link Consumer},
+ * and converted into SQL rows with {@link KafkaRowConverter}.
+ * @param <K>: type for Kafka message key,
+ *           refer to {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG};
+ * @param <V>: type for Kafka message value,
+ *           refer to {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG};
+ */
+public class KafkaMessageEnumerator<K, V> implements Enumerator<Object[]> {
+  final Consumer consumer;
+  final KafkaRowConverter<K, V> rowConverter;
+  private final AtomicBoolean cancelFlag;
+
+  //runtime
+  private final LinkedList<ConsumerRecord<K, V>> bufferedRecords = new 
LinkedList<>();
+  private ConsumerRecord<K, V> curRecord;
+
+  KafkaMessageEnumerator(final Consumer consumer,
+      final KafkaRowConverter<K, V> rowConverter,
+      final AtomicBoolean cancelFlag) {
+    this.consumer = consumer;
+    this.rowConverter = rowConverter;
+    this.cancelFlag = cancelFlag;
+  }
+
+  /**
+   * It returns an Array of Object, with each element represents a field of 
row.
+   */
+  @Override public Object[] current() {
+    return rowConverter.toRow(curRecord);
+  }
+
+  @Override public boolean moveNext() {
+    if (cancelFlag.get()) {
+      return false;
+    }
+
+    while (bufferedRecords.isEmpty()) {
+      pullRecords();
+    }
+
+    curRecord = bufferedRecords.removeFirst();
+    return true;
+  }
+
+  private void pullRecords() {
+    ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
+    for (ConsumerRecord record : records) {
+      bufferedRecords.add(record);
+    }
+  }
+
+  @Override public void reset() {
+    this.bufferedRecords.clear();
+    pullRecords();
+  }
+
+  @Override public void close() {
+    consumer.close();
+  }
+}
+// End KafkaMessageEnumerator.java
diff --git 
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverter.java 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverter.java
new file mode 100644
index 0000000..0f35cdf
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.rel.type.RelDataType;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Interface to handle formatting between Kafka message and Calcite row.
+ *
+ * @param <K>: type for Kafka message key,
+ *           refer to {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG};
+ * @param <V>: type for Kafka message value,
+ *           refer to {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG};
+ *
+ */
+public interface KafkaRowConverter<K, V> {
+
+  /**
+   * Generate row type for a given Kafka topic.
+   *
+   * @param topicName, Kafka topic name;
+   * @return row type
+   */
+  RelDataType rowDataType(String topicName);
+
+  /**
+   * Parse and reformat Kafka message from consumer,
+   *  to align with row type defined as {@link #rowDataType(String)}.
+   * @param message, the raw Kafka message record;
+   * @return fields in the row
+   */
+  Object[] toRow(ConsumerRecord<K, V> message);
+}
+// End KafkaRowConverter.java
diff --git 
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.java
 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.java
new file mode 100644
index 0000000..a659e73
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Default implementation of {@link KafkaRowConverter}, both key and value are 
byte[].
+ */
+public class KafkaRowConverterImpl implements KafkaRowConverter<byte[], 
byte[]> {
+  /**
+   * Generate row schema for a given Kafka topic.
+   *
+   * @param topicName, Kafka topic name;
+   * @return row type
+   */
+  @Override public RelDataType rowDataType(final String topicName) {
+    final RelDataTypeFactory typeFactory =
+        new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
+    fieldInfo.add("MSG_PARTITION", 
typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false);
+    fieldInfo.add("MSG_TIMESTAMP", 
typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
+    fieldInfo.add("MSG_OFFSET", 
typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
+    fieldInfo.add("MSG_KEY_BYTES", 
typeFactory.createSqlType(SqlTypeName.VARBINARY)).nullable(true);
+    fieldInfo.add("MSG_VALUE_BYTES", 
typeFactory.createSqlType(SqlTypeName.VARBINARY))
+        .nullable(false);
+
+    return fieldInfo.build();
+  }
+
+  /**
+   * Parse and reformat Kafka message from consumer, to align with row schema
+   * defined as {@link #rowDataType(String)}.
+   * @param message, the raw Kafka message record;
+   * @return fields in the row
+   */
+  @Override public Object[] toRow(final ConsumerRecord<byte[], byte[]> 
message) {
+    Object[] fields = new Object[5];
+    fields[0] = message.partition();
+    fields[1] = message.timestamp();
+    fields[2] = message.offset();
+    fields[3] = message.key();
+    fields[4] = message.value();
+
+    return fields;
+  }
+}
+// End KafkaRowConverterImpl.java
diff --git 
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaStreamTable.java 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaStreamTable.java
new file mode 100644
index 0000000..ddacfc1
--- /dev/null
+++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaStreamTable.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A table which maps to an Apache Kafka topic, currently only {@link 
KafkaStreamTable} is
+ * implemented as a STREAM table.
+ */
+public class KafkaStreamTable implements ScannableTable, StreamableTable {
+  final KafkaTableOptions tableOptions;
+
+  KafkaStreamTable(final KafkaTableOptions tableOptions) {
+    this.tableOptions = tableOptions;
+  }
+
+  @Override public Enumerable<Object[]> scan(final DataContext root) {
+    final AtomicBoolean cancelFlag = 
DataContext.Variable.CANCEL_FLAG.get(root);
+    return new AbstractEnumerable<Object[]>() {
+      public Enumerator<Object[]> enumerator() {
+        if (tableOptions.getConsumer() != null) {
+          return new KafkaMessageEnumerator(tableOptions.getConsumer(),
+              tableOptions.getRowConverter(), cancelFlag);
+        }
+
+        Properties consumerConfig = new Properties();
+        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+            tableOptions.getBootstrapServers());
+        //by default it's <byte[], byte[]>
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+            "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+            "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+        if (tableOptions.getConsumerParams() != null) {
+          consumerConfig.putAll(tableOptions.getConsumerParams());
+        }
+        Consumer consumer = new KafkaConsumer<>(consumerConfig);
+        
consumer.subscribe(Collections.singletonList(tableOptions.getTopicName()));
+
+        return new KafkaMessageEnumerator(consumer, 
tableOptions.getRowConverter(), cancelFlag);
+      }
+    };
+  }
+
+  @Override public RelDataType getRowType(final RelDataTypeFactory 
typeFactory) {
+    return 
tableOptions.getRowConverter().rowDataType(tableOptions.getTopicName());
+  }
+
+  /** Returns a provider of statistics about this table. */
+  @Override public Statistic getStatistic() {
+    return Statistics.of(100d, ImmutableList.of(),
+        RelCollations.createSingleton(0));
+  }
+
+  @Override public boolean isRolledUp(final String column) {
+    return false;
+  }
+
+  @Override public boolean rolledUpColumnValidInsideAgg(final String column, 
final SqlCall call,
+      final SqlNode parent,
+      final CalciteConnectionConfig config) {
+    return false;
+  }
+
+  @Override public Table stream() {
+    return this;
+  }
+
+  /** Type of table. */
+  @Override public Schema.TableType getJdbcTableType() {
+    return Schema.TableType.STREAM;
+  }
+}
+// End KafkaStreamTable.java
diff --git 
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableConstants.java 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableConstants.java
new file mode 100644
index 0000000..aa0582f
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+/**
+ * Parameter constants used to define a Kafka table.
+ */
+interface KafkaTableConstants {
+  String SCHEMA_TOPIC_NAME = "topic.name";
+  String SCHEMA_BOOTSTRAP_SERVERS = "bootstrap.servers";
+  String SCHEMA_ROW_CONVERTER = "row.converter";
+  String SCHEMA_CUST_CONSUMER = "consumer.cust";
+  String SCHEMA_CONSUMER_PARAMS = "consumer.params";
+}
+// End KafkaTableConstants.java
diff --git 
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableFactory.java 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableFactory.java
new file mode 100644
index 0000000..1f1252d
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TableFactory;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * Implementation of {@link TableFactory} for Apache Kafka. Currently an 
Apache Kafka
+ * topic is mapping to a STREAM table.
+ */
+public class KafkaTableFactory implements TableFactory<KafkaStreamTable> {
+  public KafkaTableFactory() {
+  }
+
+  /** Creates a Table.
+   *  @param schema Schema this table belongs to
+   * @param name Name of this table
+   * @param operand The "operand" JSON property
+   * @param rowType Row type. Specified if the "columns" JSON property.
+   */
+  @Override public KafkaStreamTable create(SchemaPlus schema,
+      String name,
+      Map<String, Object> operand,
+      RelDataType rowType) {
+    KafkaTableOptions tableOptionBuilder = new KafkaTableOptions();
+
+    tableOptionBuilder.setBootstrapServers(
+        (String) 
operand.getOrDefault(KafkaTableConstants.SCHEMA_BOOTSTRAP_SERVERS, null));
+    tableOptionBuilder.setTopicName(
+        (String) operand.getOrDefault(KafkaTableConstants.SCHEMA_TOPIC_NAME, 
null));
+
+    KafkaRowConverter rowConverter = null;
+    if (operand.containsKey(KafkaTableConstants.SCHEMA_ROW_CONVERTER)) {
+      String rowConverterClass = (String) 
operand.get(KafkaTableConstants.SCHEMA_ROW_CONVERTER);
+      try {
+        rowConverter = (KafkaRowConverter) 
Class.forName(rowConverterClass).newInstance();
+      } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
+        final String details = String.format(
+            Locale.ROOT,
+            "Fail to create table '%s' with configuration: \n"
+                + "'%s'\n"
+                + "KafkaRowConverter '%s' is invalid",
+            name, operand, rowConverterClass);
+        throw new RuntimeException(details, e);
+      }
+    } else {
+      rowConverter = new KafkaRowConverterImpl();
+    }
+    tableOptionBuilder.setRowConverter(rowConverter);
+
+    if (operand.containsKey(KafkaTableConstants.SCHEMA_CONSUMER_PARAMS)) {
+      tableOptionBuilder.setConsumerParams((Map<String, String>) operand.get(
+          KafkaTableConstants.SCHEMA_CONSUMER_PARAMS));
+    }
+    if (operand.containsKey(KafkaTableConstants.SCHEMA_CUST_CONSUMER)) {
+      String custConsumerClass = (String) 
operand.get(KafkaTableConstants.SCHEMA_CUST_CONSUMER);
+      try {
+        tableOptionBuilder.setConsumer(
+            (Consumer) Class.forName(custConsumerClass)
+                .getConstructor(OffsetResetStrategy.class)
+                .newInstance(OffsetResetStrategy.NONE)
+        );
+      } catch (ClassNotFoundException | NoSuchMethodException | 
IllegalAccessException
+          | InstantiationException | InvocationTargetException e) {
+        final String details = String.format(
+            Locale.ROOT,
+            "Fail to create table '%s' with configuration: \n"
+                + "'%s'\n"
+                + "KafkaCustConsumer '%s' is invalid",
+            name, operand, custConsumerClass);
+        throw new RuntimeException(details, e);
+      }
+    }
+
+    return new KafkaStreamTable(tableOptionBuilder);
+  }
+}
+// End KafkaTableFactory.java
diff --git 
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableOptions.java 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableOptions.java
new file mode 100644
index 0000000..230689e
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaTableOptions.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+
+import java.util.Map;
+
+/**
+ * Available options for {@link KafkaStreamTable}.
+ */
+public final class KafkaTableOptions {
+  private String bootstrapServers;
+  private String topicName;
+  private KafkaRowConverter rowConverter;
+  private Map<String, String> consumerParams;
+  //added to inject MockConsumer for testing.
+  private Consumer consumer;
+
+  public String getBootstrapServers() {
+    return bootstrapServers;
+  }
+
+  public KafkaTableOptions setBootstrapServers(final String bootstrapServers) {
+    this.bootstrapServers = bootstrapServers;
+    return this;
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  public KafkaTableOptions setTopicName(final String topicName) {
+    this.topicName = topicName;
+    return this;
+  }
+
+  public KafkaRowConverter getRowConverter() {
+    return rowConverter;
+  }
+
+  public KafkaTableOptions setRowConverter(
+      final KafkaRowConverter rowConverter) {
+    this.rowConverter = rowConverter;
+    return this;
+  }
+
+  public Map<String, String> getConsumerParams() {
+    return consumerParams;
+  }
+
+  public KafkaTableOptions setConsumerParams(final Map<String, String> 
consumerParams) {
+    this.consumerParams = consumerParams;
+    return this;
+  }
+
+  public Consumer getConsumer() {
+    return consumer;
+  }
+
+  public KafkaTableOptions setConsumer(final Consumer consumer) {
+    this.consumer = consumer;
+    return this;
+  }
+}
+// End KafkaTableOptions.java
diff --git 
a/kafka/src/main/java/org/apache/calcite/adapter/kafka/package-info.java 
b/kafka/src/main/java/org/apache/calcite/adapter/kafka/package-info.java
new file mode 100644
index 0000000..a303c70
--- /dev/null
+++ b/kafka/src/main/java/org/apache/calcite/adapter/kafka/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Kafka query provider.
+ *
+ * <p>One Kafka topic is mapping to one STREAM table.</p>
+ */
+@PackageMarker
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java
diff --git 
a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java 
b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java
new file mode 100644
index 0000000..79709cc
--- /dev/null
+++ b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaAdapterTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.test.CalciteAssert;
+
+import com.google.common.io.Resources;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+/**
+ * Unit test cases for Kafka adapter.
+ */
+public class KafkaAdapterTest {
+  protected static final URL MODEL = 
KafkaAdapterTest.class.getResource("/kafka.model.json");
+
+  private CalciteAssert.AssertThat assertModel(String model) {
+    // ensure that Schema from this instance is being used
+    model = model.replace(KafkaAdapterTest.class.getName(), 
KafkaAdapterTest.class.getName());
+
+    return CalciteAssert.that()
+        .withModel(model);
+  }
+
+  private CalciteAssert.AssertThat assertModel(URL url) {
+    Objects.requireNonNull(url, "url");
+    try {
+      return assertModel(Resources.toString(url, StandardCharsets.UTF_8));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Test public void testSelect() {
+    assertModel(MODEL)
+        .query("SELECT STREAM * FROM KAFKA.MOCKTABLE")
+        .limit(2)
+
+        .typeIs("[MSG_PARTITION INTEGER NOT NULL"
+            + ", MSG_TIMESTAMP BIGINT NOT NULL"
+            + ", MSG_OFFSET BIGINT NOT NULL"
+            + ", MSG_KEY_BYTES VARBINARY"
+            + ", MSG_VALUE_BYTES VARBINARY NOT NULL]")
+
+        .returnsUnordered(
+            "MSG_PARTITION=0; MSG_TIMESTAMP=-1; MSG_OFFSET=0; 
MSG_KEY_BYTES=mykey0; MSG_VALUE_BYTES=myvalue0",
+            "MSG_PARTITION=0; MSG_TIMESTAMP=-1; MSG_OFFSET=1"
+                + "; MSG_KEY_BYTES=mykey1; MSG_VALUE_BYTES=myvalue1")
+
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  BindableTableScan(table=[[KAFKA, MOCKTABLE, (STREAM)]])\n");
+  }
+
+  @Test public void testFilterWithProject() {
+    assertModel(MODEL)
+        .query("SELECT STREAM MSG_PARTITION,MSG_OFFSET,MSG_VALUE_BYTES FROM 
KAFKA.MOCKTABLE"
+            + " WHERE MSG_OFFSET>0")
+        .limit(1)
+
+        .returnsUnordered(
+            "MSG_PARTITION=0; MSG_OFFSET=1; MSG_VALUE_BYTES=myvalue1")
+        .explainContains(
+            "PLAN=EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], 
expr#6=[>($t2, $t5)], MSG_PARTITION=[$t0], MSG_OFFSET=[$t2], 
MSG_VALUE_BYTES=[$t4], $condition=[$t6])\n"
+                + "  EnumerableInterpreter\n"
+                + "    BindableTableScan(table=[[KAFKA, MOCKTABLE, 
(STREAM)]])");
+  }
+
+  @Test public void testCustRowConverter() {
+    assertModel(MODEL)
+        .query("SELECT STREAM * FROM KAFKA.MOCKTABLE_CUST_ROW_CONVERTER")
+        .limit(2)
+
+        .typeIs("[TOPIC_NAME VARCHAR NOT NULL"
+            + ", PARTITION_ID INTEGER NOT NULL"
+            + ", TIMESTAMP_TYPE VARCHAR]")
+
+        .returnsUnordered(
+            "TOPIC_NAME=testtopic; PARTITION_ID=0; 
TIMESTAMP_TYPE=NoTimestampType",
+            "TOPIC_NAME=testtopic; PARTITION_ID=0; 
TIMESTAMP_TYPE=NoTimestampType")
+
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  BindableTableScan(table=[[KAFKA, 
MOCKTABLE_CUST_ROW_CONVERTER, (STREAM)]])\n");
+  }
+
+
+  @Test public void testAsBatch() {
+    assertModel(MODEL)
+        .query("SELECT * FROM KAFKA.MOCKTABLE")
+        .failsAtValidation("Cannot convert stream 'MOCKTABLE' to relation");
+  }
+}
+// End KafkaAdapterTest.java
diff --git 
a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaMockConsumer.java 
b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaMockConsumer.java
new file mode 100644
index 0000000..cccbcfd
--- /dev/null
+++ 
b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaMockConsumer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+
+/**
+ * A mock consumer to test Kafka adapter.
+ */
+public class KafkaMockConsumer extends MockConsumer {
+  public KafkaMockConsumer(final OffsetResetStrategy offsetResetStrategy) {
+    super(OffsetResetStrategy.EARLIEST);
+
+    assign(Arrays.asList(new TopicPartition("testtopic", 0)));
+
+    HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+    beginningOffsets.put(new TopicPartition("testtopic", 0), 0L);
+    updateBeginningOffsets(beginningOffsets);
+
+    for (int idx = 0; idx < 10; ++idx) {
+      addRecord(new ConsumerRecord<byte[], byte[]>("testtopic",
+          0, idx, ("mykey" + idx).getBytes(StandardCharsets.UTF_8),
+          ("myvalue" + idx).getBytes(StandardCharsets.UTF_8)));
+    }
+  }
+}
+// End KafkaMockConsumer.java
diff --git 
a/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaRowConverterTest.java
 
b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaRowConverterTest.java
new file mode 100644
index 0000000..16dc3cd
--- /dev/null
+++ 
b/kafka/src/test/java/org/apache/calcite/adapter/kafka/KafkaRowConverterTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Implementation of {@link KafkaRowConverter} for test, both key and value 
are saved as byte[].
+ */
+public class KafkaRowConverterTest implements KafkaRowConverter<String, 
String> {
+  /**
+   * Generate row schema for a given Kafka topic.
+   *
+   * @param topicName, Kafka topic name;
+   * @return row type
+   */
+  @Override public RelDataType rowDataType(final String topicName) {
+    final RelDataTypeFactory typeFactory =
+        new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
+    fieldInfo.add("TOPIC_NAME", 
typeFactory.createSqlType(SqlTypeName.VARCHAR)).nullable(false);
+    fieldInfo.add("PARTITION_ID", 
typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false);
+    fieldInfo.add("TIMESTAMP_TYPE", 
typeFactory.createSqlType(SqlTypeName.VARCHAR)).nullable(true);
+
+    return fieldInfo.build();
+  }
+
+  /**
+   * Parse and reformat Kafka message from consumer, to fit with row schema
+   * defined as {@link #rowDataType(String)}.
+   * @param message, the raw Kafka message record;
+   * @return fields in the row
+   */
+  @Override public Object[] toRow(final ConsumerRecord<String, String> 
message) {
+    Object[] fields = new Object[3];
+    fields[0] = message.topic();
+    fields[1] = message.partition();
+    fields[2] = message.timestampType().name;
+
+    return fields;
+  }
+}
+// End KafkaRowConverterTest.java
diff --git a/kafka/src/test/resources/kafka.model.json 
b/kafka/src/test/resources/kafka.model.json
new file mode 100644
index 0000000..a37d499
--- /dev/null
+++ b/kafka/src/test/resources/kafka.model.json
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "version": "1.0",
+  "defaultSchema": "KAFKA",
+  "schemas": [
+    {
+      "name": "KAFKA",
+      "tables": [
+        {
+          "name": "MOCKTABLE",
+          "type": "custom",
+          "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory",
+          "operand": {
+            "consumer.cust": 
"org.apache.calcite.adapter.kafka.KafkaMockConsumer"
+          }
+        }
+        ,{
+          "name": "MOCKTABLE_CUST_ROW_CONVERTER",
+          "type": "custom",
+          "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory",
+          "operand": {
+            "consumer.cust": 
"org.apache.calcite.adapter.kafka.KafkaMockConsumer",
+            "row.converter": 
"org.apache.calcite.adapter.kafka.KafkaRowConverterTest",
+            "consumer.params": {
+              "key.deserializer": 
"org.apache.kafka.common.serialization.StringDeserializer",
+              "value.deserializer": 
"org.apache.kafka.common.serialization.StringDeserializer"
+            }
+          }
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 1b4825d..84f23df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,8 @@ limitations under the License.
     <fmpp.version>0.9.16</fmpp.version>
     <jsonpath.version>2.4.0</jsonpath.version>
 
+    <kafka.client.version>2.0.0</kafka.client.version>
+
     <!-- Other properties (not version numbers) -->
     <!-- Java 1.8 does not support -html5, so override via profile for Java 
1.8 -->
     <maven-javadoc-html5>-html5</maven-javadoc-html5>
@@ -189,6 +191,7 @@ limitations under the License.
     <module>spark</module>
     <module>splunk</module>
     <module>ubenchmark</module>
+    <module>kafka</module>
   </modules>
 
   <!-- No dependencies here. Declare dependency VERSIONS in
@@ -608,6 +611,11 @@ limitations under the License.
         <version>${jcip-annotations.version}</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-clients</artifactId>
+        <version>${kafka.client.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/site/_docs/adapter.md b/site/_docs/adapter.md
index b1c9dec..a857e91 100644
--- a/site/_docs/adapter.md
+++ b/site/_docs/adapter.md
@@ -42,6 +42,7 @@ presenting the data as tables within a schema.
 * Spark adapter (<a href="{{ site.apiRoot 
}}/org/apache/calcite/adapter/spark/package-summary.html">calcite-spark</a>)
 * Splunk adapter (<a href="{{ site.apiRoot 
}}/org/apache/calcite/adapter/splunk/package-summary.html">calcite-splunk</a>)
 * Eclipse Memory Analyzer (MAT) adapter (<a 
href="https://github.com/vlsi/mat-calcite-plugin";>mat-calcite-plugin</a>)
+* [Apache Kafka adapter](kafka_adapter.html)
 
 ### Other language interfaces
 
diff --git a/site/_docs/kafka_adapter.md b/site/_docs/kafka_adapter.md
new file mode 100644
index 0000000..000a6d4
--- /dev/null
+++ b/site/_docs/kafka_adapter.md
@@ -0,0 +1,103 @@
+---
+layout: docs
+title: Kafka adapter
+permalink: /docs/kafka_adapter.html
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+  
+**Note**:
+
+KafkaAdapter is an experimental feature, changes in public API and usage are 
expected.
+
+For instructions on downloading and building Calcite, start with 
the[tutorial]({{ site.baseurl }}/docs/tutorial.html).
+
+The Kafka adapter exposes an Apache Kafka topic as a STREAM table, so it can 
be queried using
+[Calcite Stream SQL]({{ site.baseurl }}/docs/stream.html). Note that the 
adapter will not attempt to scan all topics,
+instead users need to configure tables manually, one Kafka stream table is 
mapping to one Kafka topic.
+
+A basic example of a model file is given below:
+
+{% highlight json %}
+{
+  "version": "1.0",
+  "defaultSchema": "KAFKA",
+  "schemas": [
+    {
+      "name": "KAFKA",
+      "tables": [
+        {
+          "name": "TABLE_NAME",
+          "type": "custom",
+          "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory",
+          "row.converter": "com.example.CustKafkaRowConverter",
+          "operand": {
+            "bootstrap.servers": "host1:port,host2:port",
+            "topic.name": "kafka.topic.name",
+            "consumer.params": {
+              "key.deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+              "value.deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            }
+          }
+        }
+      ]
+    }
+  ]
+}
+{% endhighlight %}
+
+Note that:
+
+1. As Kafka message is schemaless, a [KafkaRowConverter]({{ site.apiRoot 
}}/org/apache/calcite/adapter/kafka/KafkaRowConverter.html)
+ is required to specify row schema explicitly(with parameter `row.converter`), 
and
+ how to decode Kafka message to Calcite row. [KafkaRowConverterImpl]({{ 
site.apiRoot }}/org/apache/calcite/adapter/kafka/KafkaRowConverterImpl.html)
+ is used if not provided;
+
+2. More consumer settings can be added in parameter `consumer.params`;
+
+Assuming this file is stored as `kafka.model.json`, you can connect to Kafka 
via
+[`sqlline`](https://github.com/julianhyde/sqlline) as follows:
+
+{% highlight bash %}
+$ ./sqlline
+sqlline> !connect jdbc:calcite:model=kafka.model.json admin admin
+{% endhighlight %}
+
+`sqlline` will now accept SQL queries which access your Kafka topics.
+
+With the Kafka table configured in above model. We can run a simple query to 
fetch messages:
+
+{% highlight sql %}
+sqlline> SELECT STREAM *
+         FROM KAFKA.TABLE_NAME;
++---------------+---------------------+---------------------+---------------+-----------------+
+| MSG_PARTITION |    MSG_TIMESTAMP    |     MSG_OFFSET      | MSG_KEY_BYTES | 
MSG_VALUE_BYTES |
++---------------+---------------------+---------------------+---------------+-----------------+
+| 0             | -1                  | 0                   | mykey0        | 
myvalue0        |
+| 0             | -1                  | 1                   | mykey1        | 
myvalue1        |
++---------------+---------------------+---------------------+---------------+-----------------+
+{% endhighlight %}
+
+Kafka table is a streaming table, which runs continuously, `LIMIT` can be 
added to return fast as below: 
+
+{% highlight sql %}
+sqlline> SELECT STREAM *
+         FROM KAFKA.TABLE_NAME
+         LIMIT 5;
+{% endhighlight %}
diff --git a/sqlline b/sqlline
index fc60c90..0dc6a76 100755
--- a/sqlline
+++ b/sqlline
@@ -37,7 +37,7 @@ if [ ! -f target/fullclasspath.txt ]; then
 fi
 
 CP=
-for module in core cassandra druid elasticsearch file mongodb server spark 
splunk geode example/csv example/function; do
+for module in core cassandra druid elasticsearch file kafka mongodb server 
spark splunk geode example/csv example/function; do
   CP=${CP}${module}/target/classes:
   CP=${CP}${module}/target/test-classes:
 done

Reply via email to