Initial version of Table API

Initial version of table API, it includes
 - Core table API (Table, TableDescriptor, TableSpec)
 - Local table implementation for in-memory and RocksDb
 - The writeTo() and stream-table join operators

nickpan47 xinyuiscool prateekm could you help review?

Author: Wei Song <[email protected]>

Reviewers: Yi Pan <[email protected]>, Christopher Pettitt 
<[email protected]>

Closes #349 from weisong44/table-api-14


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e74998c5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e74998c5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e74998c5

Branch: refs/heads/master
Commit: e74998c5e58c891c0744ce1be2b13f7c5861656a
Parents: e3efdf5
Author: Wei Song <[email protected]>
Authored: Tue Dec 5 13:23:44 2017 -0800
Committer: Prateek Maheshwari <[email protected]>
Committed: Tue Dec 5 13:23:44 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |   9 +
 .../java/org/apache/samza/operators/KV.java     |   2 +-
 .../apache/samza/operators/MessageStream.java   |  51 +++-
 .../org/apache/samza/operators/StreamGraph.java |  15 +
 .../apache/samza/operators/TableDescriptor.java |  73 +++++
 .../functions/StreamTableJoinFunction.java      |  59 ++++
 .../table/LocalStoreBackedTableProvider.java    |  37 +++
 .../org/apache/samza/table/ReadWriteTable.java  |  75 +++++
 .../org/apache/samza/table/ReadableTable.java   |  61 ++++
 .../main/java/org/apache/samza/table/Table.java |  31 ++
 .../org/apache/samza/table/TableProvider.java   |  57 ++++
 .../samza/table/TableProviderFactory.java       |  35 +++
 .../java/org/apache/samza/table/TableSpec.java  | 125 ++++++++
 .../java/org/apache/samza/task/TaskContext.java |   6 +-
 .../apache/samza/config/JavaTableConfig.java    |  87 ++++++
 .../apache/samza/container/TaskContextImpl.java |  24 +-
 .../samza/execution/ExecutionPlanner.java       |   5 +
 .../org/apache/samza/execution/JobGraph.java    |  16 +
 .../samza/execution/JobGraphJsonGenerator.java  |  50 +++
 .../org/apache/samza/execution/JobNode.java     |  54 +++-
 .../samza/operators/BaseTableDescriptor.java    |  94 ++++++
 .../samza/operators/MessageStreamImpl.java      |  31 +-
 .../apache/samza/operators/StreamGraphImpl.java |  40 ++-
 .../org/apache/samza/operators/TableImpl.java   |  40 +++
 .../samza/operators/impl/OperatorImplGraph.java |  30 +-
 .../operators/impl/SendToTableOperatorImpl.java |  71 +++++
 .../impl/StreamTableJoinOperatorImpl.java       |  82 +++++
 .../samza/operators/spec/OperatorSpec.java      |  12 +-
 .../samza/operators/spec/OperatorSpecs.java     |  44 ++-
 .../operators/spec/SendToTableOperatorSpec.java |  65 ++++
 .../spec/StreamTableJoinOperatorSpec.java       |  67 ++++
 .../org/apache/samza/table/TableManager.java    | 153 ++++++++++
 .../apache/samza/container/SamzaContainer.scala |  75 +++--
 .../apache/samza/container/TaskInstance.scala   |  40 ++-
 .../samza/config/TestJavaTableConfig.java       |  58 ++++
 .../samza/operators/TestMessageStreamImpl.java  |  68 ++++-
 .../samza/operators/TestStreamGraphImpl.java    |  25 +-
 .../impl/TestStreamTableJoinOperatorImpl.java   | 101 ++++++
 .../apache/samza/table/TestTableManager.java    | 176 +++++++++++
 .../org/apache/samza/task/TestAsyncRunLoop.java |  14 +-
 .../kv/inmemory/InMemoryTableDescriptor.java    |  59 ++++
 .../kv/inmemory/InMemoryTableProvider.java      |  65 ++++
 .../inmemory/InMemoryTableProviderFactory.java  |  33 ++
 .../inmemory/TestInMemoryTableDescriptor.java   |  48 +++
 .../kv/inmemory/TestInMemoryTableProvider.java  |  65 ++++
 .../storage/kv/RocksDbTableDescriptor.java      | 232 ++++++++++++++
 .../samza/storage/kv/RocksDbTableProvider.java  |  64 ++++
 .../storage/kv/RocksDbTableProviderFactory.java |  31 ++
 .../storage/kv/TestRocksDbTableDescriptor.java  |  87 ++++++
 .../storage/kv/TestRocksDbTableProvider.java    |  66 ++++
 .../kv/BaseLocalStoreBackedTableDescriptor.java |  56 ++++
 .../kv/BaseLocalStoreBackedTableProvider.java   |  92 ++++++
 .../kv/LocalStoreBackedReadWriteTable.java      |  68 +++++
 .../kv/LocalStoreBackedReadableTable.java       |  61 ++++
 .../TestLocalBaseStoreBackedTableProvider.java  |  77 +++++
 .../apache/samza/test/table/TestLocalTable.java | 304 +++++++++++++++++++
 .../apache/samza/test/table/TestTableData.java  | 200 ++++++++++++
 .../samza/test/util/ArraySystemConsumer.java    |   4 +-
 .../samza/test/util/SimpleSystemAdmin.java      |  26 +-
 59 files changed, 3668 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 50cc5e0..be1baf7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -565,6 +565,15 @@ project(":samza-kv_$scalaVersion") {
 project(":samza-kv-inmemory_$scalaVersion") {
   apply plugin: 'scala'
 
+  // Force scala joint compilation
+  sourceSets.main.scala.srcDir "src/main/java"
+  sourceSets.test.scala.srcDir "src/test/java"
+
+  // Disable the Javac compiler by forcing joint compilation by scalac. This 
is equivalent to setting
+  // tasks.compileTestJava.enabled = false
+  sourceSets.main.java.srcDirs = []
+  sourceSets.test.java.srcDirs = []
+
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/operators/KV.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/KV.java 
b/samza-api/src/main/java/org/apache/samza/operators/KV.java
index 0bed3b9..824bcb4 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/KV.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/KV.java
@@ -25,7 +25,7 @@ package org.apache.samza.operators;
  * @param <K> type of the key
  * @param <V> type of the value
  */
-public class KV<K, V> {
+public final class KV<K, V> {
   public final K key;
   public final V value;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java 
b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index acb2f33..f0a5526 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -18,21 +18,23 @@
  */
 package org.apache.samza.operators;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Function;
+
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.Function;
+import org.apache.samza.table.Table;
 
 
 /**
@@ -154,6 +156,34 @@ public interface MessageStream<M> {
       Duration ttl, String id);
 
   /**
+   * Joins this {@link MessageStream} with another {@link Table} using the 
provided
+   * pairwise {@link StreamTableJoinFunction}.
+   * <p>
+   * The type of input message is expected to be {@link KV}.
+   * <p>
+   * Records are looked up from the joined table using the join key, join 
function
+   * is applied and join results are emitted as matches are found.
+   * <p>
+   * The join function allows implementation of both inner and left outer 
join. A null will be
+   * passed to the join function, if no record matching the join key is found 
in the table.
+   * The join function can choose to return an instance of JM (outer left 
join) or null
+   * (inner join); if null is returned, it won't be processed further.
+   * <p>
+   * Both the input stream and table being joined must have the same number of 
partitions,
+   * and should be partitioned by the same join key.
+   * <p>
+   *
+   * @param table the table being joined
+   * @param joinFn the join function
+   * @param <K> the type of join key
+   * @param <R> the type of table record
+   * @param <JM> the type of messages resulting from the {@code joinFn}
+   * @return the joined {@link MessageStream}
+   */
+  <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
+      StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> 
joinFn);
+
+  /**
    * Merges all {@code otherStreams} with this {@link MessageStream}.
    * <p>
    * The merged stream contains messages from all streams in the order they 
arrive.
@@ -235,4 +265,15 @@ public interface MessageStream<M> {
    */
   <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> 
keyExtractor,
       Function<? super M, ? extends V> valueExtractor, String id);
+
+  /**
+   * Sends messages in this {@link MessageStream} to a {@link Table}. The type 
of input message is expected
+   * to be {@link KV}, otherwise a {@link ClassCastException} will be thrown.
+   *
+   * @param table the table to write messages to
+   * @param <K> the type of key in the table
+   * @param <V> the type of record value in the table
+   */
+  <K, V> void sendTo(Table<KV<K, V>> table);
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java 
b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index 4930631..6871bc7 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -20,6 +20,7 @@ package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.table.Table;
 
 
 /**
@@ -121,6 +122,20 @@ public interface StreamGraph {
   <M> OutputStream<M> getOutputStream(String streamId);
 
   /**
+   * Gets the {@link Table} corresponding to the {@link TableDescriptor}.
+   * <p>
+   * Multiple invocations of this method with the same {@link TableDescriptor} 
will throw an
+   * {@link IllegalStateException}.
+   *
+   * @param tableDesc the {@link TableDescriptor}
+   * @param <K> the type of the key
+   * @param <V> the type of the value
+   * @return the {@link Table} corresponding to the {@code tableDesc}
+   * @throws IllegalStateException when invoked multiple times with the same 
{@link TableDescriptor}
+   */
+  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc);
+
+  /**
    * Sets the {@link ContextManager} for this {@link StreamGraph}.
    * <p>
    * The provided {@link ContextManager} can be used to setup shared context 
between the operator functions

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java 
b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
new file mode 100644
index 0000000..a60b6a9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.serializers.KVSerde;
+
+/**
+ * User facing class to collect metadata that fully describes a
+ * Samza table. This interface should be implemented by concrete table 
implementations.
+ * <p>
+ * Typical user code should look like the following, notice 
<code>withConfig()</code>
+ * is defined in this class and the rest in subclasses.
+ *
+ * <pre>
+ * {@code
+ * TableDescriptor<Integer, String, ?> tableDesc = new 
RocksDbTableDescriptor("tbl")
+ *     .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
+ *     .withBlockSize(1024)
+ *     .withConfig("some-key", "some-value");
+ * }
+ * </pre>
+
+ * Once constructed, a table descriptor can be registered with the system. 
Internally,
+ * the table descriptor is then converted to a {@link 
org.apache.samza.table.TableSpec},
+ * which is used to track tables internally.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
[email protected]
+public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
+
+  /**
+   * Get the Id of the table
+   * @return Id of the table
+   */
+  String getTableId();
+
+  /**
+   * Set the Serde for this table
+   * @param serde the serde
+   * @return this table descriptor instance
+   * @throws IllegalArgumentException if null is provided
+   */
+  D withSerde(KVSerde<K, V> serde);
+
+  /**
+   * Add a configuration entry for the table
+   * @param key the key
+   * @param value the value
+   * @return this table descriptor instance
+   */
+  D withConfig(String key, String value);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
new file mode 100644
index 0000000..6afcf67
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * Joins incoming messages with records from a table by the join key.
+ *
+ * @param <K> the type of join key
+ * @param <M>  type of input message
+ * @param <R>  type of the table record
+ * @param <JM> type of join results
+ */
[email protected]
+public interface StreamTableJoinFunction<K, M, R, JM> extends 
InitableFunction, ClosableFunction {
+
+  /**
+   * Joins the provided messages and table record, returns the joined message.
+   *
+   * @param message  the input message
+   * @param record  the table record value
+   * @return  the join result
+   */
+  JM apply(M message, R record);
+
+  /**
+   * Retrieve the join key from incoming messages
+   *
+   * @param message incoming message
+   * @return the join key
+   */
+  K getMessageKey(M message);
+
+  /**
+   * Retrieve the join key from table record
+   *
+   * @param record table record
+   * @return the join key
+   */
+  K getRecordKey(R record);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java
 
b/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java
new file mode 100644
index 0000000..21630ab
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import org.apache.samza.storage.StorageEngine;
+
+
+/**
+ * Interface for tables backed by Samza local stores. The backing stores are
+ * injected during initialization of the table. Since the lifecycle
+ * of the underlying stores are already managed by Samza container,
+ * the table provider will not manage the lifecycle of the backing
+ * stores.
+ */
+public interface LocalStoreBackedTableProvider extends TableProvider {
+  /**
+   * Initializes the table provider with the backing store
+   * @param store the backing store
+   */
+  void init(StorageEngine store);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java 
b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
new file mode 100644
index 0000000..d617153
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.List;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.kv.Entry;
+
+/**
+ *
+ * A table that supports get, put and delete by one or more keys
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
[email protected]
+public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
+
+  /**
+   * Updates the mapping of the specified key-value pair; Associates the 
specified {@code key} with the specified {@code value}.
+   *
+   * @param key the key with which the specified {@code value} is to be 
associated.
+   * @param value the value with which the specified {@code key} is to be 
associated.
+   * @throws NullPointerException if the specified {@code key} or {@code 
value} is {@code null}.
+   */
+  void put(K key, V value);
+
+  /**
+   * Updates the mappings of the specified key-value {@code entries}.
+   *
+   * @param entries the updated mappings to put into this table.
+   * @throws NullPointerException if any of the specified {@code entries} has 
{@code null} as key or value.
+   */
+  void putAll(List<Entry<K, V>> entries);
+
+  /**
+   * Deletes the mapping for the specified {@code key} from this table (if 
such mapping exists).
+   *
+   * @param key the key for which the mapping is to be deleted.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  void delete(K key);
+
+  /**
+   * Deletes the mappings for the specified {@code keys} from this table.
+   *
+   * @param keys the keys for which the mappings are to be deleted.
+   * @throws NullPointerException if the specified {@code keys} list, or any 
of the keys, is {@code null}.
+   */
+  void deleteAll(List<K> keys);
+
+
+  /**
+   * Flushes the underlying store of this table, if applicable.
+   */
+  void flush();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java 
b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
new file mode 100644
index 0000000..5ad6e0f
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.KV;
+
+
+/**
+ *
+ * A table that supports get by one or more keys
+ *
+ * @param <K> the type of the record key in this table
+ * @param <V> the type of the record value in this table
+ */
[email protected]
+public interface ReadableTable<K, V> extends Table<KV<K, V>> {
+
+  /**
+   * Gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return if found, the value associated with the specified {@code key}; 
otherwise, {@code null}.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  V get(K key);
+
+  /**
+   * Gets the values with which the specified {@code keys} are associated.
+   *
+   * @param keys the keys with which the associated values are to be fetched.
+   * @return a map of the keys that were found and their respective values.
+   * @throws NullPointerException if the specified {@code keys} list, or any 
of the keys, is {@code null}.
+   */
+  Map<K, V> getAll(List<K> keys);
+
+  /**
+   * Close the table and release any resources acquired
+   */
+  void close();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/Table.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/Table.java 
b/samza-api/src/main/java/org/apache/samza/table/Table.java
new file mode 100644
index 0000000..767e176
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/Table.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ *
+ * Marker interface for a table.
+ *
+ * @param <R> the type of records in the table
+ */
[email protected]
+public interface Table<R> {
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java 
b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
new file mode 100644
index 0000000..54c6f5d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.Map;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * A table provider provides the implementation for a table. It ensures a 
table is
+ * properly constructed and also manages its lifecycle.
+ */
[email protected]
+public interface TableProvider {
+  /**
+   * Get an instance of the table for read/write operations
+   * @return the underlying table
+   */
+  Table getTable();
+
+  /**
+   * Generate any configuration for this table, the generated configuration
+   * is used by Samza container to construct this table and any components
+   * necessary
+   * .
+   * @param config the current configuration
+   * @return configuration for this table
+   */
+  Map<String, String> generateConfig(Map<String, String> config);
+
+  /**
+   * Start the underlying table
+   */
+  void start();
+
+  /**
+   * Stop the underlying table
+   */
+  void stop();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java 
b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
new file mode 100644
index 0000000..1bb0196
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * Factory of a table provider object
+ */
[email protected]
+public interface TableProviderFactory {
+  /**
+   * Constructs an instances of the table provider based on a given table spec
+   * @param tableSpec the table spec
+   * @return the table provider
+   */
+  TableProvider getTableProvider(TableSpec tableSpec);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java 
b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
new file mode 100644
index 0000000..68043f9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.serializers.KVSerde;
+
+
+/**
+ * TableSpec is a blueprint for creating, validating, or simply describing a 
table in the runtime environment.
+ *
+ * It is typically created indirectly by constructing an instance of {@link 
org.apache.samza.operators.TableDescriptor},
+ * and then invoke <code>BaseTableDescriptor.getTableSpec()</code>.
+ *
+ * It has specific attributes for common behaviors that Samza uses.
+ *
+ * It has the table provider factory, which provides the actual table 
implementation.
+ *
+ * It also includes a map of configurations which may be 
implementation-specific.
+ *
+ * It is immutable by design.
+ */
[email protected]
+public class TableSpec {
+
+  private final String id;
+  private final KVSerde serde;
+  private final String tableProviderFactoryClassName;
+  private final Map<String, String> config = new HashMap<>();
+
+  /**
+   * Default constructor
+   */
+  public TableSpec() {
+    this.id = null;
+    this.serde = null;
+    this.tableProviderFactoryClassName = null;
+  }
+
+  /**
+   * Constructs a {@link TableSpec}
+   *
+   * @param tableId Id of the table
+   * @param tableProviderFactoryClassName table provider factory
+   * @param serde the serde
+   * @param config implementation specific configuration
+   */
+  public TableSpec(String tableId, KVSerde serde, String 
tableProviderFactoryClassName,
+      Map<String, String> config) {
+    this.id = tableId;
+    this.serde = serde;
+    this.tableProviderFactoryClassName = tableProviderFactoryClassName;
+    this.config.putAll(config);
+  }
+
+  /**
+   * Get the Id of the table
+   * @return Id of the table
+   */
+  public String getId() {
+    return id;
+  }
+
+  /**
+   * Get the serde
+   * @param <K> the type of the key
+   * @param <V> the type of the value
+   * @return the key serde
+   */
+  public <K, V> KVSerde<K, V> getSerde() {
+    return serde;
+  }
+
+  /**
+   * Get the class name of the table provider factory
+   * @return class name of the table provider factory
+   */
+  public String getTableProviderFactoryClassName() {
+    return tableProviderFactoryClassName;
+  }
+
+  /**
+   * Get implementation configuration for the table
+   * @return configuration for the table
+   */
+  public Map<String, String> getConfig() {
+    return Collections.unmodifiableMap(config);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || !getClass().equals(o.getClass())) {
+      return false;
+    }
+    return id.equals(((TableSpec) o).id);
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java 
b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index 4ef3d30..11ffacc 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -19,12 +19,14 @@
 
 package org.apache.samza.task;
 
+import java.util.Set;
+
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.table.Table;
 
-import java.util.Set;
 
 /**
  * A TaskContext provides resources about the {@link 
org.apache.samza.task.StreamTask}, particularly during
@@ -37,6 +39,8 @@ public interface TaskContext {
 
   Object getStore(String name);
 
+  Table getTable(String tableId);
+
   TaskName getTaskName();
 
   SamzaContainerContext getSamzaContainerContext();

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
new file mode 100644
index 0000000..6cc3986
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A helper class for handling table configuration
+ */
+public class JavaTableConfig extends MapConfig {
+
+  // Prefix
+  public static final String TABLES_PREFIX = "tables.";
+  public static final String TABLE_ID_PREFIX = TABLES_PREFIX + "%s";
+
+  // Suffix
+  public static final String TABLE_PROVIDER_FACTORY_SUFFIX = 
".provider.factory";
+
+  // Config keys
+  public static final String TABLE_PROVIDER_FACTORY = 
String.format("%s.provider.factory", TABLE_ID_PREFIX);
+  public static final String TABLE_KEY_SERDE = String.format("%s.key.serde", 
TABLE_ID_PREFIX);
+  public static final String TABLE_VALUE_SERDE = 
String.format("%s.value.serde", TABLE_ID_PREFIX);
+
+
+  public JavaTableConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * Get Id's of all tables
+   * @return list of table Id's
+   */
+  public List<String> getTableIds() {
+    Config subConfig = subset(TABLES_PREFIX, true);
+    Set<String> tableNames = subConfig.keySet().stream()
+        .filter(k -> k.endsWith(TABLE_PROVIDER_FACTORY_SUFFIX))
+        .map(k -> k.substring(0, k.indexOf(".")))
+        .collect(Collectors.toSet());
+    return new LinkedList<>(tableNames);
+  }
+
+  /**
+   * Get the {@link org.apache.samza.table.TableProviderFactory} class for a 
table
+   * @param tableId Id of the table
+   * @return the {@link org.apache.samza.table.TableProviderFactory} class name
+   */
+  public String getTableProviderFactory(String tableId) {
+    return get(String.format(TABLE_PROVIDER_FACTORY, tableId), null);
+  }
+
+  /**
+   * Get registry keys of key serde for this table
+   * @param tableId Id of the table
+   * @return serde retistry key
+   */
+  public String getKeySerde(String tableId) {
+    return get(String.format(TABLE_KEY_SERDE, tableId), null);
+  }
+
+  /**
+   * Get registry keys of value serde for this table
+   * @param tableId Id of the table
+   * @return serde retistry key
+   */
+  public String getValueSerde(String tableId) {
+    return get(String.format(TABLE_VALUE_SERDE, tableId), null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java 
b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
index aa622a3..0248486 100644
--- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
@@ -19,7 +19,10 @@
 
 package org.apache.samza.container;
 
-import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
@@ -27,13 +30,13 @@ import org.apache.samza.storage.TaskStorageManager;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableManager;
 import org.apache.samza.task.TaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import com.google.common.collect.ImmutableSet;
 
 public class TaskContextImpl implements TaskContext {
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskContextImpl.class);
@@ -44,6 +47,7 @@ public class TaskContextImpl implements TaskContext {
   private final Set<SystemStreamPartition> systemStreamPartitions;
   private final OffsetManager offsetManager;
   private final TaskStorageManager storageManager;
+  private final TableManager tableManager;
   private final JobModel jobModel;
   private final StreamMetadataCache streamMetadataCache;
   private final Map<String, Object> objectRegistry = new HashMap<>();
@@ -56,6 +60,7 @@ public class TaskContextImpl implements TaskContext {
                          Set<SystemStreamPartition> systemStreamPartitions,
                          OffsetManager offsetManager,
                          TaskStorageManager storageManager,
+                         TableManager tableManager,
                          JobModel jobModel,
                          StreamMetadataCache streamMetadataCache) {
     this.taskName = taskName;
@@ -64,6 +69,7 @@ public class TaskContextImpl implements TaskContext {
     this.systemStreamPartitions = ImmutableSet.copyOf(systemStreamPartitions);
     this.offsetManager = offsetManager;
     this.storageManager = storageManager;
+    this.tableManager = tableManager;
     this.jobModel = jobModel;
     this.streamMetadataCache = streamMetadataCache;
   }
@@ -89,6 +95,16 @@ public class TaskContextImpl implements TaskContext {
   }
 
   @Override
+  public Table getTable(String tableId) {
+    if (tableManager != null) {
+      return tableManager.getTable(tableId);
+    } else {
+      LOG.warn("No table manager found");
+      return null;
+    }
+  }
+
+  @Override
   public TaskName getTaskName() {
     return taskName;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 468aab9..e2c122a 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -39,6 +39,7 @@ import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,6 +96,7 @@ public class ExecutionPlanner {
     Set<StreamSpec> sourceStreams = new 
HashSet<>(streamGraph.getInputOperators().keySet());
     Set<StreamSpec> sinkStreams = new 
HashSet<>(streamGraph.getOutputStreams().keySet());
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
+    Set<TableSpec> tables = new HashSet<>(streamGraph.getTables().keySet());
     intStreams.retainAll(sinkStreams);
     sourceStreams.removeAll(intStreams);
     sinkStreams.removeAll(intStreams);
@@ -113,6 +115,9 @@ public class ExecutionPlanner {
     // add intermediate streams
     intStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, 
node));
 
+    // add tables
+    tables.forEach(spec -> jobGraph.addTable(spec, node));
+
     jobGraph.validate();
 
     return jobGraph;

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 2a09e90..4a09260 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -30,11 +30,13 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
+
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +57,7 @@ import org.slf4j.LoggerFactory;
   private final Set<StreamEdge> sources = new HashSet<>();
   private final Set<StreamEdge> sinks = new HashSet<>();
   private final Set<StreamEdge> intermediateStreams = new HashSet<>();
+  private final Set<TableSpec> tables = new HashSet<>();
   private final Config config;
   private final JobGraphJsonGenerator jsonGenerator = new 
JobGraphJsonGenerator();
 
@@ -86,6 +89,11 @@ import org.slf4j.LoggerFactory;
         .collect(Collectors.toList());
   }
 
+  void addTable(TableSpec tableSpec, JobNode node) {
+    tables.add(tableSpec);
+    node.addTable(tableSpec);
+  }
+
   @Override
   public String getPlanAsJson() throws Exception {
     return jsonGenerator.toJson(this);
@@ -211,6 +219,14 @@ import org.slf4j.LoggerFactory;
   }
 
   /**
+   * Return the tables in the graph
+   * @return unmodifiable set of {@link TableSpec}
+   */
+  Set<TableSpec> getTables() {
+    return Collections.unmodifiableSet(tables);
+  }
+
+  /**
    * Return the intermediate streams in the graph
    * @return unmodifiable set of {@link StreamEdge}
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 03845e3..2729fa3 100644
--- 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -28,12 +28,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
+import org.apache.samza.table.TableSpec;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -54,6 +57,15 @@ import org.codehaus.jackson.map.ObjectMapper;
     int partitionCount;
   }
 
+  static final class TableSpecJson {
+    @JsonProperty("id")
+    String id;
+    @JsonProperty("tableProviderFactory")
+    String tableProviderFactory;
+    @JsonProperty("config")
+    Map<String, String> config;
+  }
+
   static final class StreamEdgeJson {
     @JsonProperty("streamSpec")
     StreamSpecJson streamSpec;
@@ -97,6 +109,8 @@ import org.codehaus.jackson.map.ObjectMapper;
     Map<String, StreamEdgeJson> sinkStreams;
     @JsonProperty("intermediateStreams")
     Map<String, StreamEdgeJson> intermediateStreams;
+    @JsonProperty("tables")
+    Map<String, TableSpecJson> tables;
     @JsonProperty("applicationName")
     String applicationName;
     @JsonProperty("applicationId")
@@ -119,9 +133,11 @@ import org.codehaus.jackson.map.ObjectMapper;
     jobGraphJson.sourceStreams = new HashMap<>();
     jobGraphJson.sinkStreams = new HashMap<>();
     jobGraphJson.intermediateStreams = new HashMap<>();
+    jobGraphJson.tables = new HashMap<>();
     jobGraph.getSources().forEach(e -> buildStreamEdgeJson(e, 
jobGraphJson.sourceStreams));
     jobGraph.getSinks().forEach(e -> buildStreamEdgeJson(e, 
jobGraphJson.sinkStreams));
     jobGraph.getIntermediateStreamEdges().forEach(e -> buildStreamEdgeJson(e, 
jobGraphJson.intermediateStreams));
+    jobGraph.getTables().forEach(t -> buildTableJson(t, jobGraphJson.tables));
 
     jobGraphJson.jobs = jobGraph.getJobNodes().stream()
         .map(jobNode -> buildJobNodeJson(jobNode))
@@ -206,6 +222,11 @@ import org.codehaus.jackson.map.ObjectMapper;
       map.put("outputStreamId", outputStream.getStreamSpec().getId());
     }
 
+    if (spec instanceof StreamTableJoinOperatorSpec) {
+      TableSpec tableSpec = ((StreamTableJoinOperatorSpec) 
spec).getTableSpec();
+      map.put("tableId", tableSpec.getId());
+    }
+
     if (spec instanceof JoinOperatorSpec) {
       map.put("ttlMs", ((JoinOperatorSpec) spec).getTtlMs());
     }
@@ -247,4 +268,33 @@ import org.codehaus.jackson.map.ObjectMapper;
     }
     return edgeJson;
   }
+
+  /**
+   * Get or create the JSON POJO for a {@link TableSpec}
+   * @param tableSpec the {@link TableSpec}
+   * @param tableSpecs a map of tableId to {@link TableSpecJson}
+   * @return JSON representation of the {@link TableSpec}
+   */
+  private TableSpecJson buildTableJson(TableSpec tableSpec, Map<String, 
TableSpecJson> tableSpecs) {
+    String tableId = tableSpec.getId();
+    TableSpecJson tableSpecJson = tableSpecs.get(tableId);
+    if (tableSpecJson == null) {
+      tableSpecJson = buildTableJson(tableSpec);
+      tableSpecs.put(tableId, tableSpecJson);
+    }
+    return tableSpecJson;
+  }
+
+  /**
+   * Create the JSON POJO for a {@link TableSpec}
+   * @param tableSpec the {@link TableSpec}
+   * @return JSON representation of the {@link TableSpec}
+   */
+  private TableSpecJson buildTableJson(TableSpec tableSpec) {
+    TableSpecJson tableSpecJson = new TableSpecJson();
+    tableSpecJson.id = tableSpec.getId();
+    tableSpecJson.tableProviderFactory = 
tableSpec.getTableProviderFactoryClassName();
+    tableSpecJson.config = tableSpec.getConfig();
+    return tableSpecJson;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 2e89292..4e337d9 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.execution;
 
-import com.google.common.base.Joiner;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collection;
@@ -29,7 +28,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
+
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
@@ -47,10 +48,16 @@ import org.apache.samza.operators.util.MathUtils;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
+
+
 /**
  * A JobNode is a physical execution unit. In RemoteExecutionEnvironment, it's 
a job that will be submitted
  * to remote cluster. In LocalExecutionEnvironment, it's a set of 
StreamProcessors for local execution.
@@ -67,6 +74,7 @@ public class JobNode {
   private final StreamGraphImpl streamGraph;
   private final List<StreamEdge> inEdges = new ArrayList<>();
   private final List<StreamEdge> outEdges = new ArrayList<>();
+  private final List<TableSpec> tables = new ArrayList<>();
   private final Config config;
 
   JobNode(String jobName, String jobId, StreamGraphImpl streamGraph, Config 
config) {
@@ -109,6 +117,10 @@ public class JobNode {
     return outEdges;
   }
 
+  void addTable(TableSpec tableSpec) {
+    tables.add(tableSpec);
+  }
+
   /**
    * Generate the configs for a job
    * @param executionPlanJson JSON representation of the execution plan
@@ -147,6 +159,19 @@ public class JobNode {
     // write serialized serde instances and stream serde configs to configs
     addSerdeConfigs(configs);
 
+    tables.forEach(tableSpec -> {
+        // Table provider factory
+        configs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, 
tableSpec.getId()),
+            tableSpec.getTableProviderFactoryClassName());
+
+        // Note: no need to generate config for Serde's, as they are already 
produced by addSerdeConfigs()
+
+        // Generate additional configuration
+        TableProviderFactory tableProviderFactory = 
Util.getObj(tableSpec.getTableProviderFactoryClassName());
+        TableProvider tableProvider = 
tableProviderFactory.getTableProvider(tableSpec);
+        configs.putAll(tableProvider.generateConfig(configs));
+      });
+
     log.info("Job {} has generated configs {}", jobName, configs);
 
     String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
@@ -209,11 +234,21 @@ public class JobNode {
         }
       });
 
+    // collect all key and msg serde instances for tables
+    Map<String, Serde> tableKeySerdes = new HashMap<>();
+    Map<String, Serde> tableValueSerdes = new HashMap<>();
+    tables.forEach(tableSpec -> {
+        tableKeySerdes.put(tableSpec.getId(), 
tableSpec.getSerde().getKeySerde());
+        tableValueSerdes.put(tableSpec.getId(), 
tableSpec.getSerde().getValueSerde());
+      });
+
     // for each unique stream or store serde instance, generate a unique name 
and serialize to config
     HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
     serdes.addAll(streamMsgSerdes.values());
     serdes.addAll(storeKeySerdes.values());
     serdes.addAll(storeMsgSerdes.values());
+    serdes.addAll(tableKeySerdes.values());
+    serdes.addAll(tableValueSerdes.values());
     SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
     Base64.Encoder base64Encoder = Base64.getEncoder();
     Map<Serde, String> serdeUUIDs = new HashMap<>();
@@ -247,6 +282,17 @@ public class JobNode {
         String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), 
storeName);
         configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
       });
+
+    // set key and msg serdes for tables to the serde names generated above
+    tableKeySerdes.forEach((tableId, serde) -> {
+        String keySerdeConfigKey = 
String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId);
+        configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    tableValueSerdes.forEach((tableId, serde) -> {
+        String valueSerdeConfigKey = 
String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId);
+        configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
+      });
   }
 
   /**
@@ -264,10 +310,14 @@ public class JobNode {
 
     // Filter out the join operators, and obtain a list of their ttl values
     List<Long> joinTtlIntervals = operatorSpecs.stream()
-        .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.JOIN)
+        .filter(spec -> spec instanceof JoinOperatorSpec)
         .map(spec -> ((JoinOperatorSpec) spec).getTtlMs())
         .collect(Collectors.toList());
 
+    if (joinTtlIntervals.isEmpty()) {
+      return -1;
+    }
+
     // Combine both the above lists
     List<Long> candidateTimerIntervals = new ArrayList<>(joinTtlIntervals);
     candidateTimerIntervals.addAll(windowTimerIntervals);

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java 
b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
new file mode 100644
index 0000000..b875c2e
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Base class for all table descriptor implementations.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+abstract public class BaseTableDescriptor<K, V, D extends 
BaseTableDescriptor<K, V, D>>
+    implements TableDescriptor<K, V, D> {
+
+  protected final String tableId;
+
+  protected KVSerde<K, V> serde = KVSerde.of(new NoOpSerde(), new NoOpSerde());
+
+  protected final Map<String, String> config = new HashMap<>();
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table
+   */
+  protected BaseTableDescriptor(String tableId) {
+    this.tableId = tableId;
+  }
+
+  @Override
+  public D withConfig(String key, String value) {
+    config.put(key, value);
+    return (D) this;
+  }
+
+  @Override
+  public D withSerde(KVSerde<K, V> serde) {
+    if (serde == null) {
+      throw new IllegalArgumentException("Serde cannot be null");
+    }
+    this.serde = serde;
+    return (D) this;
+  }
+
+  @Override
+  public String getTableId() {
+    return tableId;
+  }
+
+  /**
+   * Generate config for {@link TableSpec}; this method is used internally.
+   * @param tableSpecConfig configuration for the {@link TableSpec}
+   */
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+    tableSpecConfig.putAll(config);
+  }
+
+  /**
+   * Validate that this table descriptor is constructed properly; this method 
is used internally.
+   */
+  protected void validate() {
+  }
+
+  /**
+   * Create a {@link TableSpec} from this table descriptor; this method is 
used internally.
+   *
+   * @return the {@link TableSpec}
+   */
+  abstract public TableSpec getTableSpec();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 3f4e40d..07af54f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -19,12 +19,17 @@
 
 package org.apache.samza.operators;
 
+import java.time.Duration;
+import java.util.Collection;
+import java.util.function.Function;
+
 import org.apache.samza.SamzaException;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
@@ -32,18 +37,18 @@ import org.apache.samza.operators.spec.OperatorSpecs;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
-
-import java.time.Duration;
-import java.util.Collection;
-import java.util.function.Function;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
 
 
 /**
@@ -138,6 +143,16 @@ public class MessageStreamImpl<M> implements 
MessageStream<M> {
   }
 
   @Override
+  public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
+      StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> 
joinFn) {
+    TableSpec tableSpec = ((TableImpl) table).getTableSpec();
+    StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = 
OperatorSpecs.createStreamTableJoinOperatorSpec(
+        tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, 
this.graph.getNextOpId(OpCode.JOIN));
+    this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
+    return new MessageStreamImpl<>(this.graph, joinOpSpec);
+  }
+
+  @Override
   public MessageStream<M> merge(Collection<? extends MessageStream<? extends 
M>> otherStreams) {
     if (otherStreams.isEmpty()) return this;
     String opId = this.graph.getNextOpId(OpCode.MERGE);
@@ -176,4 +191,12 @@ public class MessageStreamImpl<M> implements 
MessageStream<M> {
   protected OperatorSpec<?, M> getOperatorSpec() {
     return this.operatorSpec;
   }
+
+  @Override
+  public <K, V> void sendTo(Table<KV<K, V>> table) {
+    SendToTableOperatorSpec<K, V> op = 
OperatorSpecs.createSendToTableOperatorSpec(
+        this.operatorSpec, ((TableImpl) table).getTableSpec(), 
this.graph.getNextOpId(OpCode.SEND_TO));
+    this.operatorSpec.registerNextOperatorSpec(op);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index d014cb9..b607c62 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,7 +18,15 @@
  */
 package org.apache.samza.operators;
 
-import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -34,17 +42,12 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
+import com.google.common.base.Preconditions;
 
 /**
  * A {@link StreamGraph} that provides APIs for accessing {@link 
MessageStream}s to be used to
@@ -57,6 +60,7 @@ public class StreamGraphImpl implements StreamGraph {
   // We use a LHM for deterministic order in initializing and closing 
operators.
   private final Map<StreamSpec, InputOperatorSpec> inputOperators = new 
LinkedHashMap<>();
   private final Map<StreamSpec, OutputStreamImpl> outputStreams = new 
LinkedHashMap<>();
+  private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
   private final ApplicationRunner runner;
   private final Config config;
 
@@ -146,6 +150,18 @@ public class StreamGraphImpl implements StreamGraph {
   }
 
   @Override
+  public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) {
+    TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec();
+    if (tables.containsKey(tableSpec)) {
+      throw new IllegalStateException(String.format(
+          "getTable() invoked multiple times with the same tableId: %s",
+          tableDesc.getTableId()));
+    }
+    tables.put(tableSpec, new TableImpl(tableSpec));
+    return tables.get(tableSpec);
+  }
+
+  @Override
   public StreamGraph withContextManager(ContextManager contextManager) {
     this.contextManager = contextManager;
     return this;
@@ -163,7 +179,7 @@ public class StreamGraphImpl implements StreamGraph {
    */
   <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, 
Serde<M> serde) {
     StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    
+
     Preconditions.checkState(!inputOperators.containsKey(streamSpec) && 
!outputStreams.containsKey(streamSpec),
         "getIntermediateStream must not be called multiple times with the same 
streamId: " + streamId);
 
@@ -190,6 +206,10 @@ public class StreamGraphImpl implements StreamGraph {
     return Collections.unmodifiableMap(outputStreams);
   }
 
+  public Map<TableSpec, TableImpl> getTables() {
+    return Collections.unmodifiableMap(tables);
+  }
+
   public ContextManager getContextManager() {
     return this.contextManager;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
new file mode 100644
index 0000000..e671534
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * This class is the holder of a {@link TableSpec}
+ */
+public class TableImpl implements Table {
+
+  private final TableSpec tableSpec;
+
+  public TableImpl(TableSpec tableSpec) {
+    this.tableSpec = tableSpec;
+  }
+
+  public TableSpec getTableSpec() {
+    return tableSpec;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 0bb12d2..ea278c1 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -18,10 +18,15 @@
  */
 package org.apache.samza.operators.impl;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.job.model.JobModel;
@@ -29,6 +34,7 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.operators.impl.store.TimestampedValue;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -36,8 +42,9 @@ import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskContext;
@@ -45,14 +52,9 @@ import org.apache.samza.util.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 
 /**
  * The DAG of {@link OperatorImpl}s corresponding to the DAG of {@link 
OperatorSpec}s.
@@ -212,6 +214,10 @@ public class OperatorImplGraph {
       return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
     } else if (operatorSpec instanceof JoinOperatorSpec) {
       return createPartialJoinOperatorImpl(prevOperatorSpec, 
(JoinOperatorSpec) operatorSpec, config, context, clock);
+    } else if (operatorSpec instanceof StreamTableJoinOperatorSpec) {
+      return new StreamTableJoinOperatorImpl((StreamTableJoinOperatorSpec) 
operatorSpec, config, context);
+    } else if (operatorSpec instanceof SendToTableOperatorSpec) {
+      return new SendToTableOperatorImpl((SendToTableOperatorSpec) 
operatorSpec, config, context);
     }
     throw new IllegalArgumentException(
         String.format("Unsupported OperatorSpec: %s", 
operatorSpec.getClass().getName()));

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
new file mode 100644
index 0000000..5ce1328
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation of a send-stream-to-table operator that stores the record
+ * in the table.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ */
+public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, 
Void> {
+
+  private final SendToTableOperatorSpec<K, V> sendToTableOpSpec;
+  private final ReadWriteTable<K, V> table;
+
+  SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, 
Config config, TaskContext context) {
+    this.sendToTableOpSpec = sendToTableOpSpec;
+    this.table = (ReadWriteTable) 
context.getTable(sendToTableOpSpec.getTableSpec().getId());
+  }
+
+  @Override
+  protected void handleInit(Config config, TaskContext context) {
+  }
+
+  @Override
+  protected Collection<Void> handleMessage(KV<K, V> message, MessageCollector 
collector, TaskCoordinator coordinator) {
+    table.put(message.getKey(), message.getValue());
+    // there should be no further chained operators since this is a terminal 
operator.
+    return Collections.emptyList();
+  }
+
+  @Override
+  protected void handleClose() {
+    table.close();
+  }
+
+  @Override
+  protected OperatorSpec<KV<K, V>, Void> getOperatorSpec() {
+    return sendToTableOpSpec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
new file mode 100644
index 0000000..54a5770
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation of a stream-table join operator that first retrieve the 
value of
+ * the message key from incoming message, and then apply the join function.
+ *
+ * @param <K> type of the join key
+ * @param <M> type of input messages
+ * @param <R> type of the table record
+ * @param <JM> type of the join result
+ */
+class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends 
OperatorImpl<M, JM> {
+
+  private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec;
+  private final ReadableTable<K, ?> table;
+
+  StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> 
joinOpSpec,
+      Config config, TaskContext context) {
+    this.joinOpSpec = joinOpSpec;
+    this.table = (ReadableTable) 
context.getTable(joinOpSpec.getTableSpec().getId());
+  }
+
+  @Override
+  protected void handleInit(Config config, TaskContext context) {
+    this.joinOpSpec.getJoinFn().init(config, context);
+  }
+
+  @Override
+  public Collection<JM> handleMessage(M message, MessageCollector collector, 
TaskCoordinator coordinator) {
+    K key = joinOpSpec.getJoinFn().getMessageKey(message);
+    Object recordValue = table.get(key);
+    R record = recordValue != null ? (R) KV.of(key, recordValue) : null;
+    JM output = joinOpSpec.getJoinFn().apply(message, record);
+
+    // The support for inner and outer join will be provided in the jonFn. For 
inner join, the joinFn might
+    // return null, when the corresponding record is absent in the table.
+    return output != null ?
+        Collections.singletonList(output)
+      : Collections.emptyList();
+  }
+
+  @Override
+  protected void handleClose() {
+    this.joinOpSpec.getJoinFn().close();
+  }
+
+  protected OperatorSpec<M, JM> getOperatorSpec() {
+    return joinOpSpec;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 17f1b49..2a5991c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -18,17 +18,17 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
-
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.Set;
 
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.functions.WatermarkFunction;
+
 /**
- * A stream operator specification that holds all the information required to 
transform 
+ * A stream operator specification that holds all the information required to 
transform
  * the input {@link org.apache.samza.operators.MessageStreamImpl} and produce 
the output
  * {@link org.apache.samza.operators.MessageStreamImpl}.
  *

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 1b3b8aa..c752fe2 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -19,6 +19,10 @@
 
 package org.apache.samza.operators.spec;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Function;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.FilterFunction;
@@ -26,15 +30,13 @@ import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.TableSpec;
 import org.apache.samza.task.TaskContext;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.Function;
-
 
 /**
  * Factory methods for creating {@link OperatorSpec} instances.
@@ -242,4 +244,38 @@ public class OperatorSpecs {
         },
         OperatorSpec.OpCode.MERGE, opId);
   }
+
+  /**
+   * Creates a {@link StreamTableJoinOperatorSpec} with a join function.
+   *
+   * @param tableSpec the table spec for the table on the right side of the 
join
+   * @param joinFn the user-defined join function to get join keys and results
+   * @param opId the unique ID of the operator
+   * @param <K> the type of join key
+   * @param <M> the type of input messages
+   * @param <R> the type of table record
+   * @param <JM> the type of the join result
+   * @return the {@link StreamTableJoinOperatorSpec}
+   */
+  public static <K, M, R, JM> StreamTableJoinOperatorSpec<K, M, R, JM> 
createStreamTableJoinOperatorSpec(
+      TableSpec tableSpec, StreamTableJoinFunction<K, M, R, JM> joinFn, String 
opId) {
+    return new StreamTableJoinOperatorSpec(tableSpec, joinFn, opId);
+  }
+
+  /**
+   * Creates a {@link SendToTableOperatorSpec} with a key extractor and a 
value extractor function,
+   * the type of incoming message is expected to be KV&#60;K, V&#62;.
+   *
+   * @param inputOpSpec the operator spec for the input stream
+   * @param tableSpec the table spec for the underlying table
+   * @param opId the unique ID of the operator
+   * @param <K> the type of the table record key
+   * @param <V> the type of the table record value
+   * @return the {@link SendToTableOperatorSpec}
+   */
+  public static <K, V> SendToTableOperatorSpec<K, V> 
createSendToTableOperatorSpec(
+      OperatorSpec<?, KV<K, V>> inputOpSpec, TableSpec tableSpec, String opId) 
{
+    return new SendToTableOperatorSpec(inputOpSpec, tableSpec, opId);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
new file mode 100644
index 0000000..9084be2
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * The spec for operator that writes a stream to a table by extracting keys 
and values
+ * from the incoming messages.
+ *
+ * @param <K> the type of the table record key
+ * @param <V> the type of the table record value
+ */
[email protected]
+public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, 
Void> {
+
+  private final OperatorSpec<?, KV<K, V>> inputOpSpec;
+  private final TableSpec tableSpec;
+
+  /**
+   * Constructor for a {@link SendToTableOperatorSpec}.
+   *
+   * @param inputOpSpec  the operator spec of the input stream
+   * @param tableSpec  the table spec of the table written to
+   * @param opId  the unique ID for this operator
+   */
+  SendToTableOperatorSpec(OperatorSpec<?, KV<K, V>> inputOpSpec, TableSpec 
tableSpec, String opId) {
+    super(OpCode.SEND_TO, opId);
+    this.inputOpSpec = inputOpSpec;
+    this.tableSpec = tableSpec;
+  }
+
+  public OperatorSpec<?, KV<K, V>> getInputOpSpec() {
+    return inputOpSpec;
+  }
+
+  public TableSpec getTableSpec() {
+    return tableSpec;
+  }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return null;
+  }
+}

Reply via email to