http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
new file mode 100644
index 0000000..730913a
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * The spec for stream-table join operator that retrieves a record from the 
table using key
+ * derived from the incoming message and joins with the incoming message.
+ *
+ * @param <M>  the type of input messages
+ * @param <R>  the type of table record
+ * @param <JM>  the type of join result
+ */
[email protected]
+public class StreamTableJoinOperatorSpec<K, M, R, JM> extends OperatorSpec<M, 
JM> {
+
+  private final TableSpec tableSpec;
+  private final StreamTableJoinFunction<K, M, R, JM> joinFn;
+
+  /**
+   * Constructor for {@link StreamTableJoinOperatorSpec}.
+   *
+   * @param tableSpec  the table spec for the table on the right side of the 
join
+   * @param joinFn  the user-defined join function to get join keys and results
+   * @param opId  the unique ID for this operator
+   */
+  StreamTableJoinOperatorSpec(TableSpec tableSpec, StreamTableJoinFunction<K, 
M, R, JM> joinFn, String opId) {
+    super(OpCode.JOIN, opId);
+    this.tableSpec = tableSpec;
+    this.joinFn = joinFn;
+  }
+
+  public TableSpec getTableSpec() {
+    return tableSpec;
+  }
+
+  public StreamTableJoinFunction<K, M, R, JM> getJoinFn() {
+    return this.joinFn;
+  }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : 
null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java 
b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
new file mode 100644
index 0000000..c3555f3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link TableManager} manages tables within a Samza task. For each table, 
it maintains
+ * the {@link TableSpec} and the {@link TableProvider}. It is used at 
execution for
+ * {@link org.apache.samza.container.TaskInstance} to retrieve table instances 
for
+ * read/write operations.
+ *
+ * A {@link TableManager} is constructed from job configuration, the {@link 
TableSpec}
+ * and {@link TableProvider} are constructed by processing the job 
configuration.
+ *
+ * After a {@link TableManager} is constructed, local tables are associated 
with
+ * local store instances created during {@link 
org.apache.samza.container.SamzaContainer}
+ * initialization.
+ *
+ * Method {@link TableManager#getTable(String)} will throw {@link 
IllegalStateException},
+ * if it's called before initialization.
+ *
+ * For store backed tables, the list of stores must be injected into the 
constructor.
+ */
+public class TableManager {
+
+  static public class TableCtx {
+    private TableSpec tableSpec;
+    private TableProvider tableProvider;
+  }
+
+  private final Logger logger = 
LoggerFactory.getLogger(TableManager.class.getName());
+
+  // tableId -> TableCtx
+  private final Map<String, TableCtx> tables = new HashMap<>();
+
+  private boolean localTablesInitialized;
+
+  /**
+   * Construct a table manager instance
+   * @param config the job configuration
+   * @param serdes Serde instances for tables
+   */
+  public TableManager(Config config, Map<String, Serde<Object>> serdes) {
+
+    new JavaTableConfig(config).getTableIds().forEach(tableId -> {
+
+        // Construct the table provider
+        String tableProviderFactory = 
config.get(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableId));
+
+        // Construct the KVSerde
+        JavaTableConfig tableConfig = new JavaTableConfig(config);
+        KVSerde serde = KVSerde.of(
+            serdes.get(tableConfig.getKeySerde(tableId)),
+            serdes.get(tableConfig.getValueSerde(tableId)));
+
+        TableSpec tableSpec = new TableSpec(tableId, serde, 
tableProviderFactory,
+            config.subset(String.format(JavaTableConfig.TABLE_ID_PREFIX, 
tableId) + "."));
+
+        addTable(tableSpec);
+
+        logger.info("Added table " + tableSpec.getId());
+      });
+  }
+
+  /**
+   * Initialize all local table
+   * @param stores stores created locally
+   */
+  public void initLocalTables(Map<String, StorageEngine> stores) {
+    tables.values().forEach(ctx -> {
+        if (ctx.tableProvider instanceof LocalStoreBackedTableProvider) {
+          StorageEngine store = stores.get(ctx.tableSpec.getId());
+          if (store == null) {
+            throw new SamzaException(String.format(
+                "Backing store for table %s was not injected by 
SamzaContainer",
+                ctx.tableSpec.getId()));
+          }
+          ((LocalStoreBackedTableProvider) ctx.tableProvider).init(store);
+        }
+      });
+
+    localTablesInitialized = true;
+  }
+
+  /**
+   * Add a table to the table manager
+   * @param tableSpec the table spec
+   */
+  private void addTable(TableSpec tableSpec) {
+    if (tables.containsKey(tableSpec.getId())) {
+      throw new SamzaException("Table " + tableSpec.getId() + " already 
exists");
+    }
+    TableCtx ctx = new TableCtx();
+    TableProviderFactory tableProviderFactory = 
Util.getObj(tableSpec.getTableProviderFactoryClassName());
+    ctx.tableProvider = tableProviderFactory.getTableProvider(tableSpec);
+    ctx.tableSpec = tableSpec;
+    tables.put(tableSpec.getId(), ctx);
+  }
+
+  /**
+   * Start the table manager, internally it starts all tables
+   */
+  public void start() {
+    tables.values().forEach(ctx -> ctx.tableProvider.start());
+  }
+
+  /**
+   * Shutdown the table manager, internally it shuts down all tables
+   */
+  public void shutdown() {
+    tables.values().forEach(ctx -> ctx.tableProvider.stop());
+  }
+
+  /**
+   * Get a table instance
+   * @param tableId Id of the table
+   * @return table instance
+   */
+  public Table getTable(String tableId) {
+    if (!localTablesInitialized) {
+      throw new IllegalStateException("Local tables in TableManager not 
initialized.");
+    }
+    return tables.get(tableId).tableProvider.getTable();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 412e9dc..f465bfc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -20,65 +20,37 @@
 package org.apache.samza.container
 
 import java.io.File
+import java.net.{URL, UnknownHostException}
 import java.nio.file.Path
 import java.util
-import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
-import java.net.{URL, UnknownHostException}
 import java.util.Base64
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 
-import org.apache.samza.{SamzaContainerStatus, SamzaException}
 import org.apache.samza.checkpoint.{CheckpointListener, 
CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
-import org.apache.samza.config._
 import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.container.disk.DiskQuotaPolicyFactory
-import org.apache.samza.container.disk.DiskSpaceMonitor
+import org.apache.samza.config._
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
-import org.apache.samza.container.disk.NoThrottlingDiskQuotaPolicyFactory
-import org.apache.samza.container.disk.PollingScanDiskSpaceMonitor
+import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, 
DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, 
PollingScanDiskSpaceMonitor}
 import org.apache.samza.container.host.{StatisticsMonitorImpl, 
SystemMemoryStatistics, SystemStatisticsMonitor}
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
-import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
-import org.apache.samza.metrics.JmxServer
-import org.apache.samza.metrics.JvmMetrics
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.serializers.IntermediateMessageSerde
-import org.apache.samza.serializers.NoOpSerde
-import org.apache.samza.serializers.SerializableSerde
-import org.apache.samza.serializers.Serde
-import org.apache.samza.serializers.SerdeFactory
-import org.apache.samza.serializers.SerdeManager
-import org.apache.samza.serializers.StringSerde
+import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, 
MetricsReporter}
+import org.apache.samza.serializers._
 import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.storage.StorageEngineFactory
-import org.apache.samza.storage.TaskStorageManager
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemConsumersMetrics
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemProducers
-import org.apache.samza.system.SystemProducersMetrics
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.chooser.DefaultChooser
-import org.apache.samza.system.chooser.MessageChooserFactory
-import org.apache.samza.system.chooser.RoundRobinChooserFactory
+import org.apache.samza.storage.{StorageEngineFactory, TaskStorageManager}
+import org.apache.samza.system._
+import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, 
RoundRobinChooserFactory}
+import org.apache.samza.table.TableManager
 import org.apache.samza.task._
-import org.apache.samza.util.HighResolutionClock
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Throttleable
-import org.apache.samza.util.MetricsReporterLoader
-import org.apache.samza.util.SystemClock
-import org.apache.samza.util.Util
 import org.apache.samza.util.Util.asScalaClock
+import org.apache.samza.util._
+import org.apache.samza.{SamzaContainerStatus, SamzaException}
 
 import scala.collection.JavaConverters._
 
@@ -568,6 +540,11 @@ object SamzaContainer extends Logging {
         new StorageConfig(config).getChangeLogDeleteRetentionsInMs,
         new SystemClock)
 
+      val tableManager = new TableManager(config, serdes.asJava)
+      tableManager.initLocalTables(taskStores.asJava)
+
+      info("Got table manager");
+
       val systemStreamPartitions = taskModel
         .getSystemStreamPartitions
         .asScala
@@ -586,6 +563,7 @@ object SamzaContainer extends Logging {
           containerContext = containerContext,
           offsetManager = offsetManager,
           storageManager = storageManager,
+          tableManager = tableManager,
           reporters = reporters,
           systemStreamPartitions = systemStreamPartitions,
           exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, 
config),
@@ -711,6 +689,7 @@ class SamzaContainer(
       startOffsetManager
       startLocalityManager
       startStores
+      startTableManager
       startDiskSpaceMonitor
       startHostStatisticsMonitor
       startProducers
@@ -745,6 +724,7 @@ class SamzaContainer(
 
       shutdownConsumers
       shutdownTask
+      shutdownTableManager
       shutdownStores
       shutdownDiskSpaceMonitor
       shutdownHostStatisticsMonitor
@@ -885,9 +865,9 @@ class SamzaContainer(
   }
 
   def startStores {
-    info("Starting task instance stores.")
     taskInstances.values.foreach(taskInstance => {
       val startTime = System.currentTimeMillis()
+      info("Starting stores in task instance %s" format taskInstance.taskName)
       taskInstance.startStores
       // Measuring the time to restore the stores
       val timeToRestore = System.currentTimeMillis() - startTime
@@ -898,6 +878,13 @@ class SamzaContainer(
     })
   }
 
+  def startTableManager: Unit = {
+    taskInstances.values.foreach(taskInstance => {
+      info("Starting table manager in task instance %s" format 
taskInstance.taskName)
+      taskInstance.startTableManager
+    })
+  }
+
   def startTask {
     info("Initializing stream tasks.")
 
@@ -1003,6 +990,12 @@ class SamzaContainer(
     taskInstances.values.foreach(_.shutdownStores)
   }
 
+  def shutdownTableManager: Unit = {
+    info("Shutting down task instance table manager.")
+
+    taskInstances.values.foreach(_.shutdownTableManager)
+  }
+
   def shutdownLocalityManager {
     if(localityManager != null) {
       info("Shutting down locality manager.")

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index acec365..f2a5074 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -27,20 +27,9 @@ import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.storage.TaskStorageManager
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.task.AsyncStreamTask
-import org.apache.samza.task.ClosableTask
-import org.apache.samza.task.EndOfStreamListenerTask
-import org.apache.samza.task.InitableTask
-import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskCallbackFactory
-import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.task.WindowableTask
+import org.apache.samza.system._
+import org.apache.samza.table.TableManager
+import org.apache.samza.task._
 import org.apache.samza.util.Logging
 
 import scala.collection.JavaConverters._
@@ -56,6 +45,7 @@ class TaskInstance(
   containerContext: SamzaContainerContext,
   val offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
+  tableManager: TableManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
   val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
   val exceptionHandler: TaskInstanceExceptionHandler = new 
TaskInstanceExceptionHandler,
@@ -68,7 +58,7 @@ class TaskInstance(
   val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
 
   val context = new TaskContextImpl(taskName,metrics, containerContext, 
systemStreamPartitions.asJava, offsetManager,
-                                    storageManager, jobModel, 
streamMetadataCache)
+                                    storageManager, tableManager, jobModel, 
streamMetadataCache)
 
   // store the (ssp -> if this ssp is catched up) mapping. "catched up"
   // means the same ssp in other taskInstances have the same offset as
@@ -101,6 +91,16 @@ class TaskInstance(
     }
   }
 
+  def startTableManager {
+    if (tableManager != null) {
+      debug("Starting table manager for taskName: %s" format taskName)
+
+      tableManager.start
+    } else {
+      debug("Skipping table manager initialization for taskName: %s" format 
taskName)
+    }
+  }
+
   def initTask {
     if (isInitableTask) {
       debug("Initializing task for taskName: %s" format taskName)
@@ -225,6 +225,16 @@ class TaskInstance(
     }
   }
 
+  def shutdownTableManager {
+    if (tableManager != null) {
+      debug("Shutting down table manager for taskName: %s" format taskName)
+
+      tableManager.shutdown
+    } else {
+      debug("Skipping table manager shutdown for taskName: %s" format taskName)
+    }
+  }
+
   override def toString() = "TaskInstance for class %s and taskName %s." 
format (task.getClass.getName, taskName)
 
   def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, 
closable=%s endofstreamlistener=%s]" format

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
new file mode 100644
index 0000000..2775ca7
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+
+public class TestJavaTableConfig {
+  @Test
+  public void testGetTableIds() {
+    Set<String> ids = Sets.newHashSet("t1", "t2");
+    Map<String, String> map = ids.stream()
+        .map(id -> String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, id))
+        .collect(Collectors.toMap(key -> key, key -> key + 
"-provider-factory"));
+    JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map));
+
+    assertEquals(2, tableConfig.getTableIds().size());
+
+    ids.removeAll(tableConfig.getTableIds());
+    assertTrue(ids.isEmpty());
+  }
+
+  @Test
+  public void testGetTableProperties() {
+    Map<String, String> map = new HashMap<>();
+    map.put("tables.t1.spec", "t1-spec");
+    map.put("tables.t1.provider.factory", "t1-provider-factory");
+    JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map));
+    assertEquals("t1-provider-factory", 
tableConfig.getTableProviderFactory("t1"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index d97d494..96e234e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -18,7 +18,12 @@
  */
 package org.apache.samza.operators;
 
-import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
@@ -27,14 +32,17 @@ import 
org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.windows.Window;
@@ -42,14 +50,11 @@ import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import java.time.Duration;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.function.Function;
-import java.util.function.Supplier;
+import com.google.common.collect.ImmutableList;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -313,6 +318,57 @@ public class TestMessageStreamImpl {
   }
 
   @Test
+  public void testSendToTable() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec inputOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> source = new 
MessageStreamImpl<>(mockGraph, inputOpSpec);
+
+    TableSpec tableSpec = new TableSpec();
+    TableImpl table = new TableImpl(tableSpec);
+
+    source.sendTo(table);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = 
ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(inputOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = 
registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof SendToTableOperatorSpec);
+    SendToTableOperatorSpec sendToTableOperatorSpec = 
(SendToTableOperatorSpec) registeredOpSpec;
+
+    assertEquals(OpCode.SEND_TO, sendToTableOperatorSpec.getOpCode());
+    assertEquals(inputOpSpec, sendToTableOperatorSpec.getInputOpSpec());
+    assertEquals(tableSpec, sendToTableOperatorSpec.getTableSpec());
+  }
+
+  @Test
+  public void testStreamTableJoin() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<KV<String, TestMessageEnvelope>> source1 = new 
MessageStreamImpl<>(mockGraph, leftInputOpSpec);
+    OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> source2 = new 
MessageStreamImpl<>(mockGraph, rightInputOpSpec);
+
+    TableSpec tableSpec = new TableSpec();
+    TableImpl table = new TableImpl(tableSpec);
+
+    source2.sendTo(table);
+
+    StreamTableJoinFunction<String, KV<String, TestMessageEnvelope>, 
KV<String, TestMessageEnvelope>, TestOutputMessageEnvelope>
+        mockJoinFn = mock(StreamTableJoinFunction.class);
+    source1.join(table, mockJoinFn);
+
+    ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = 
ArgumentCaptor.forClass(OperatorSpec.class);
+    
verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> leftRegisteredOpSpec = 
leftRegisteredOpCaptor.getValue();
+
+    assertTrue(leftRegisteredOpSpec instanceof StreamTableJoinOperatorSpec);
+    StreamTableJoinOperatorSpec joinOpSpec = (StreamTableJoinOperatorSpec) 
leftRegisteredOpSpec;
+    assertEquals(OpCode.JOIN, joinOpSpec.getOpCode());
+    assertEquals(mockJoinFn, joinOpSpec.getJoinFn());
+    assertEquals(tableSpec, joinOpSpec.getTableSpec());
+  }
+
+  @Test
   public void testMerge() {
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     OperatorSpec mockOpSpec1 = mock(OperatorSpec.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index cf0a198..3bb44b5 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -12,14 +12,16 @@
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied.  See the License for THE
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.samza.operators;
 
-import com.google.common.collect.ImmutableList;
-import junit.framework.Assert;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -33,10 +35,11 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.collect.ImmutableList;
+import junit.framework.Assert;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -583,4 +586,16 @@ public class TestStreamGraphImpl {
     Assert.assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
     Assert.assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
   }
+
+  @Test
+  public void testGetTable() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+    BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
+    when(mockTableDescriptor.getTableSpec()).thenReturn(
+        new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", 
new HashMap<>()));
+    Assert.assertNotNull(graph.getTable(mockTableDescriptor));
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
new file mode 100644
index 0000000..d8b2e8d
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestStreamTableJoinOperatorImpl {
+  @Test
+  public void testHandleMessage() {
+
+    String tableId = "t1";
+    TableSpec tableSpec = mock(TableSpec.class);
+    when(tableSpec.getId()).thenReturn(tableId);
+
+    StreamTableJoinOperatorSpec mockJoinOpSpec = 
mock(StreamTableJoinOperatorSpec.class);
+    when(mockJoinOpSpec.getTableSpec()).thenReturn(tableSpec);
+    when(mockJoinOpSpec.getJoinFn()).thenReturn(
+        new StreamTableJoinFunction<String, KV<String, String>, KV<String, 
String>, String>() {
+          @Override
+          public String apply(KV<String, String> message, KV<String, String> 
record) {
+            if ("1".equals(message.getKey())) {
+              Assert.assertEquals("m1", message.getValue());
+              Assert.assertEquals("r1", record.getValue());
+              return "m1r1";
+            } else if ("2".equals(message.getKey())) {
+              Assert.assertEquals("m2", message.getValue());
+              Assert.assertNull(record);
+              return null;
+            }
+            throw new SamzaException("Should never reach here!");
+          }
+
+          @Override
+          public String getMessageKey(KV<String, String> message) {
+            return message.getKey();
+          }
+
+          @Override
+          public String getRecordKey(KV<String, String> record) {
+            return record.getKey();
+          }
+        });
+    Config config = mock(Config.class);
+    ReadableTable table = mock(ReadableTable.class);
+    when(table.get("1")).thenReturn("r1");
+    when(table.get("2")).thenReturn(null);
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getTable(tableId)).thenReturn(table);
+
+    MessageCollector mockMessageCollector = mock(MessageCollector.class);
+    TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
+
+    StreamTableJoinOperatorImpl streamTableJoinOperator = new 
StreamTableJoinOperatorImpl(
+        mockJoinOpSpec, config, mockTaskContext);
+
+    // Table has the key
+    Collection<TestMessageEnvelope> result;
+    result = streamTableJoinOperator.handleMessage(KV.of("1", "m1"), 
mockMessageCollector, mockTaskCoordinator);
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("m1r1", result.iterator().next());
+    // Table doesn't have the key
+    result = streamTableJoinOperator.handleMessage(KV.of("2", "m2"), 
mockMessageCollector, mockTaskCoordinator);
+    Assert.assertEquals(0, result.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java 
b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
new file mode 100644
index 0000000..df5b9e5
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.lang.reflect.Field;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.StorageEngine;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestTableManager {
+
+  private static final String TABLE_ID = "t1";
+
+  public static class DummyTableProviderFactory implements 
TableProviderFactory {
+
+    static Table table;
+    static LocalStoreBackedTableProvider tableProvider;
+
+    @Override
+    public TableProvider getTableProvider(TableSpec tableSpec) {
+      table = mock(Table.class);
+      tableProvider = mock(LocalStoreBackedTableProvider.class);
+      when(tableProvider.getTable()).thenReturn(table);
+      return tableProvider;
+    }
+  }
+
+  @Test
+  public void testInitByConfig() {
+    Map<String, String> map = new HashMap<>();
+    map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), 
DummyTableProviderFactory.class.getName());
+    map.put(String.format("tables.%s.some.config", TABLE_ID), "xyz");
+    addKeySerde(map);
+    addValueSerde(map);
+    doTestInit(map);
+  }
+
+  @Test(expected = Exception.class)
+  public void testInitFailsWithoutProviderFactory() {
+    Map<String, String> map = new HashMap<>();
+    addKeySerde(map);
+    addValueSerde(map);
+    doTestInit(map);
+  }
+
+  @Test(expected = Exception.class)
+  public void testInitFailsWithoutKeySerde() {
+    Map<String, String> map = new HashMap<>();
+    map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), 
DummyTableProviderFactory.class.getName());
+    addValueSerde(map);
+    doTestInit(map);
+  }
+
+  @Test(expected = Exception.class)
+  public void testInitFailsWithoutValueSerde() {
+    Map<String, String> map = new HashMap<>();
+    map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), 
DummyTableProviderFactory.class.getName());
+    addValueSerde(map);
+    doTestInit(map);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testInitFailsWithoutInitializingLocalTables() {
+    TableManager tableManager = new TableManager(new MapConfig(new 
HashMap<>()), new HashMap<>());
+    tableManager.getTable("dummy");
+  }
+
+  private void doTestInit(Map<String, String> map) {
+    Map<String, StorageEngine> storageEngines = new HashMap<>();
+    storageEngines.put(TABLE_ID, mock(StorageEngine.class));
+
+    Map<String, Serde<Object>> serdeMap = new HashMap<>();
+    SerializableSerde<Serde> serializableSerde = new SerializableSerde();
+    map.keySet().stream()
+        .filter(k -> k.endsWith(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX()))
+        .forEach(k -> {
+            String serdeName = k
+                .replace(String.format(SerializerConfig.SERIALIZER_PREFIX(), 
""), "")
+                .replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), "");
+            String serializedSerde = map.get(k);
+            byte[] bytes = Base64.getDecoder().decode(serializedSerde);
+            Serde serde = serializableSerde.fromBytes(bytes);
+            serdeMap.put(serdeName, serde);
+          });
+
+    TableManager tableManager = new TableManager(new MapConfig(map), serdeMap);
+    tableManager.initLocalTables(storageEngines);
+
+    Table table = tableManager.getTable(TABLE_ID);
+    verify(DummyTableProviderFactory.tableProvider, 
times(1)).init(anyObject());
+    Assert.assertEquals(DummyTableProviderFactory.table, table);
+
+    Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, 
"tables");
+    TableManager.TableCtx ctx = ctxMap.get(TABLE_ID);
+
+    TableSpec tableSpec = getFieldValue(ctx, "tableSpec");
+    Assert.assertEquals(TABLE_ID, tableSpec.getId());
+    Assert.assertEquals(DummyTableProviderFactory.class.getName(), 
tableSpec.getTableProviderFactoryClassName());
+    Assert.assertEquals(IntegerSerde.class, 
tableSpec.getSerde().getKeySerde().getClass());
+    Assert.assertEquals(StringSerde.class, 
tableSpec.getSerde().getValueSerde().getClass());
+    Assert.assertEquals("xyz", tableSpec.getConfig().get("some.config"));
+
+    TableProvider tableProvider = getFieldValue(ctx, "tableProvider");
+    Assert.assertNotNull(tableProvider);
+  }
+
+  private void addKeySerde(Map<String, String> map) {
+    String serdeId = "key-serde";
+    map.put(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), 
serdeId),
+        serializeSerde(new IntegerSerde()));
+    map.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, TABLE_ID), serdeId);
+  }
+
+  private void addValueSerde(Map<String, String> map) {
+    String serdeId = "value-serde";
+    map.put(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), 
serdeId),
+            serializeSerde(new StringSerde("UTF-8")));
+    map.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, TABLE_ID), 
serdeId);
+  }
+
+  private String serializeSerde(Serde serde) {
+    return Base64.getEncoder().encodeToString(new 
SerializableSerde().toBytes(serde));
+  }
+
+  private <T> T getFieldValue(Object object, String fieldName) {
+    Field field = null;
+    try {
+      field = object.getClass().getDeclaredField(fieldName);
+      field.setAccessible(true);
+      return (T) field.get(object);
+    } catch (NoSuchFieldException | IllegalAccessException ex) {
+      throw new SamzaException(ex);
+    } finally {
+      if (field != null) {
+        field.setAccessible(false);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index b399f5f..28a4f8b 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.samza.Partition;
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.config.Config;
@@ -47,16 +48,15 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.TestSystemConsumers;
 import org.junit.Before;
 import org.junit.Test;
+
 import scala.Option;
 import scala.collection.JavaConverters;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
 
 // TODO(spvenkat) SAMZA-1183: Fix all commented out tests.
 public class TestAsyncRunLoop {
@@ -86,7 +86,7 @@ public class TestAsyncRunLoop {
     scala.collection.immutable.Set<SystemStreamPartition> sspSet = 
JavaConverters.asScalaSetConverter(Collections.singleton(ssp)).asScala().toSet();
     return new TaskInstance(task, taskName, mock(Config.class), 
taskInstanceMetrics,
         null, consumers, mock(TaskInstanceCollector.class), 
mock(SamzaContainerContext.class),
-        manager, null, null, sspSet, new 
TaskInstanceExceptionHandler(taskInstanceMetrics, new 
scala.collection.immutable.HashSet<String>()), null, null);
+        manager, null, null, null, sspSet, new 
TaskInstanceExceptionHandler(taskInstanceMetrics, new 
scala.collection.immutable.HashSet<String>()), null, null);
   }
 
   TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, 
SystemStreamPartition ssp) {

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
 
b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
new file mode 100644
index 0000000..2681fb3
--- /dev/null
+++ 
b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.storage.kv.BaseLocalStoreBackedTableDescriptor;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table descriptor for in-memory tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class InMemoryTableDescriptor<K, V> extends 
BaseLocalStoreBackedTableDescriptor<K, V, InMemoryTableDescriptor<K, V>> {
+
+  public InMemoryTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  @Override
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+    super.generateTableSpecConfig(tableSpecConfig);
+  }
+
+  @Override
+  public TableSpec getTableSpec() {
+
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    return new TableSpec(tableId, serde, 
InMemoryTableProviderFactory.class.getName(), tableSpecConfig);
+  }
+
+  private void addInMemoryConfig(Map<String, String> map, String key, String 
value) {
+    map.put("inmemory." + key, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java
 
b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java
new file mode 100644
index 0000000..c1c2f1c
--- /dev/null
+++ 
b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.storage.kv.BaseLocalStoreBackedTableProvider;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table provider of an in-memory table
+ */
+public class InMemoryTableProvider extends BaseLocalStoreBackedTableProvider {
+
+  public InMemoryTableProvider(TableSpec tableSpec) {
+    super(tableSpec);
+  }
+
+  @Override
+  public Map<String, String> generateConfig(Map<String, String> config) {
+
+    Map<String, String> tableConfig = new HashMap<>();
+
+    // Store factory configuration
+    tableConfig.put(String.format(
+        StorageConfig.FACTORY(), tableSpec.getId()),
+        InMemoryKeyValueStorageEngineFactory.class.getName());
+
+    // Common store configuration
+    tableConfig.putAll(generateCommonStoreConfig(config));
+
+    // Rest of the configuration
+    tableSpec.getConfig().forEach((k, v) -> {
+      String realKey = k.startsWith("inmemory.") ?
+          String.format("stores.%s", tableSpec.getId()) + "." + 
k.substring("inmemory.".length())
+        : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + 
"." + k;
+      tableConfig.put(realKey, v);
+    });
+
+    logger.info("Generated configuration for table " + tableSpec.getId());
+
+    return tableConfig;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProviderFactory.java
 
b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProviderFactory.java
new file mode 100644
index 0000000..f05982a
--- /dev/null
+++ 
b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProviderFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory;
+
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+/**
+ * Factory class for an in-memory table provider
+ */
+public class InMemoryTableProviderFactory implements TableProviderFactory {
+  @Override
+  public TableProvider getTableProvider(TableSpec tableSpec) {
+    return new InMemoryTableProvider(tableSpec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
 
b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
new file mode 100644
index 0000000..840fb70
--- /dev/null
+++ 
b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.inmemory;
+
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.table.TableSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestInMemoryTableDescriptor {
+  @Test
+  public void testTableSpec() {
+
+    TableSpec tableSpec = new InMemoryTableDescriptor<Integer, String>("1")
+        .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+        .withConfig("inmemory.abc", "xyz")
+        .getTableSpec();
+
+    Assert.assertNotNull(tableSpec.getSerde());
+    Assert.assertNotNull(tableSpec.getSerde().getKeySerde());
+    Assert.assertNotNull(tableSpec.getSerde().getValueSerde());
+    Assert.assertEquals("xyz", getConfig(tableSpec, "abc"));
+  }
+
+  private String getConfig(TableSpec tableSpec, String key) {
+    return tableSpec.getConfig().get("inmemory." + key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java
 
b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java
new file mode 100644
index 0000000..76b7a66
--- /dev/null
+++ 
b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.inmemory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+
+public class TestInMemoryTableProvider {
+  @Test
+  public void testGenerateConfig() {
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    tableSpecConfig.put("inmemory.c1", "c1-value");
+    tableSpecConfig.put("inmemory.c2", "c2-value");
+    tableSpecConfig.put("c3", "c3-value");
+    tableSpecConfig.put("c4", "c4-value");
+
+    TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), 
new IntegerSerde()),
+        "my-table-provider-factory", tableSpecConfig);
+
+    Map<String, String> config = new HashMap<>();
+    config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+    config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+    TableProvider tableProvider = new InMemoryTableProvider(tableSpec);
+    Map<String, String> tableConfig = tableProvider.generateConfig(config);
+
+    Assert.assertEquals("ks1", 
tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+    Assert.assertEquals("vs1", 
tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+    Assert.assertEquals(
+        InMemoryKeyValueStorageEngineFactory.class.getName(),
+        tableConfig.get(String.format(StorageConfig.FACTORY(), "t1")));
+    Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1"));
+    Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2"));
+    Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3"));
+    Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
new file mode 100644
index 0000000..2c62159
--- /dev/null
+++ 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table descriptor for RocksDb backed tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class RocksDbTableDescriptor<K, V> extends 
BaseLocalStoreBackedTableDescriptor<K, V, RocksDbTableDescriptor<K, V>> {
+
+  static final public String WRITE_BATCH_SIZE = "write.batch.size";
+  static final public String OBJECT_CACHE_SIZE = "object.cache.size";
+  static final public String CONTAINER_CACHE_SIZE_BYTES = 
"container.cache.size.bytes";
+  static final public String CONTAINER_WRITE_BUFFER_SIZE_BYTES = 
"container.write.buffer.size.bytes";
+  static final public String ROCKSDB_COMPRESSION = "rocksdb.compression";
+  static final public String ROCKSDB_BLOCK_SIZE_BYTES = 
"rocksdb.block.size.bytes";
+  static final public String ROCKSDB_TTL_MS = "rocksdb.ttl.ms";
+  static final public String ROCKSDB_COMPACTION_STYLE = 
"rocksdb.compaction.style";
+  static final public String ROCKSDB_NUM_WRITE_BUFFERS = 
"rocksdb.num.write.buffers";
+  static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = 
"rocksdb.max.log.file.size.bytes";
+  static final public String ROCKSDB_KEEP_LOG_FILE_NUM = 
"rocksdb.keep.log.file.num";
+
+  protected Integer writeBatchSize;
+  protected Integer objectCacheSize;
+  private Integer cacheSize;
+  private Integer writeBufferSize;
+  private Integer blockSize;
+  private Integer ttl;
+  private Integer numWriteBuffers;
+  private Integer maxLogFileSize;
+  private Integer numLogFilesToKeep;
+  private String compressionType;
+  private String compactionStyle;
+
+  public RocksDbTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  /**
+   * Refer to <code>stores.store-name.write.batch.size</code> in Samza 
configuration guide
+   * @param writeBatchSize write batch size
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor withWriteBatchSize(int writeBatchSize) {
+    this.writeBatchSize = writeBatchSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.object.cache.size</code> in Samza 
configuration guide
+   * @param objectCacheSize the object cache size
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor withObjectCacheSize(int objectCacheSize) {
+    this.objectCacheSize = objectCacheSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.container.cache.size.bytes</code> in 
Samza configuration guide
+   * @param cacheSize the cache size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withCacheSize(int cacheSize) {
+    this.cacheSize = cacheSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.container.write.buffer.size.bytes</code> 
in Samza configuration guide
+   * @param writeBufferSize the write buffer size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withWriteBufferSize(int writeBufferSize) 
{
+    this.writeBufferSize = writeBufferSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.compression</code> in Samza 
configuration guide
+   * @param compressionType the compression type
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withCompressionType(String 
compressionType) {
+    this.compressionType = compressionType;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.block.size.bytes</code> in Samza 
configuration guide
+   * @param blockSize the block size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withBlockSize(int blockSize) {
+    this.blockSize = blockSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.ttl.ms</code> in Samza 
configuration guide
+   * @param ttl the time to live in milliseconds
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withTtl(int ttl) {
+    this.ttl = ttl;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.compaction.style</code> in Samza 
configuration guide
+   * @param compactionStyle the compaction style
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withCompactionStyle(String 
compactionStyle) {
+    this.compactionStyle = compactionStyle;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.num.write.buffers</code> in 
Samza configuration guide
+   * @param numWriteBuffers the number of write buffers
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withNumWriteBuffers(int numWriteBuffers) 
{
+    this.numWriteBuffers = numWriteBuffers;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.max.log.file.size.bytes</code> 
in Samza configuration guide
+   * @param maxLogFileSize the maximal log file size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withMaxLogFileSize(int maxLogFileSize) {
+    this.maxLogFileSize = maxLogFileSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.num.write.buffers</code> in 
Samza configuration guide
+   * @param numLogFilesToKeep the number of log files to keep
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withNumLogFilesToKeep(int 
numLogFilesToKeep) {
+    this.numLogFilesToKeep = numLogFilesToKeep;
+    return this;
+  }
+
+  /**
+   * Create a table spec based on this table description
+   * @return the table spec
+   */
+  @Override
+  public TableSpec getTableSpec() {
+
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    return new TableSpec(tableId, serde, 
RocksDbTableProviderFactory.class.getName(), tableSpecConfig);
+  }
+
+  @Override
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+
+    super.generateTableSpecConfig(tableSpecConfig);
+
+    if (writeBatchSize != null) {
+      addRocksDbConfig(tableSpecConfig, WRITE_BATCH_SIZE, 
writeBatchSize.toString());
+    }
+    if (objectCacheSize != null) {
+      addRocksDbConfig(tableSpecConfig, OBJECT_CACHE_SIZE, 
objectCacheSize.toString());
+    }
+    if (cacheSize != null) {
+      addRocksDbConfig(tableSpecConfig, CONTAINER_CACHE_SIZE_BYTES, 
cacheSize.toString());
+    }
+    if (writeBufferSize != null) {
+      addRocksDbConfig(tableSpecConfig, CONTAINER_WRITE_BUFFER_SIZE_BYTES, 
writeBufferSize.toString());
+    }
+    if (compressionType != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPRESSION, compressionType);
+    }
+    if (blockSize != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_BLOCK_SIZE_BYTES, 
blockSize.toString());
+    }
+    if (ttl != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_TTL_MS, ttl.toString());
+    }
+    if (compactionStyle != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPACTION_STYLE, 
compactionStyle);
+    }
+    if (numWriteBuffers != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_NUM_WRITE_BUFFERS, 
numWriteBuffers.toString());
+    }
+    if (maxLogFileSize != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, 
maxLogFileSize.toString());
+    }
+    if (numLogFilesToKeep != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_KEEP_LOG_FILE_NUM, 
numLogFilesToKeep.toString());
+    }
+  }
+
+  private void addRocksDbConfig(Map<String, String> map, String key, String 
value) {
+    map.put("rocksdb." + key, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
new file mode 100644
index 0000000..eb8188f
--- /dev/null
+++ 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table provider for tables backed by RocksDb.
+ */
+public class RocksDbTableProvider extends BaseLocalStoreBackedTableProvider {
+
+  public RocksDbTableProvider(TableSpec tableSpec) {
+    super(tableSpec);
+  }
+
+  @Override
+  public Map<String, String> generateConfig(Map<String, String> config) {
+
+    Map<String, String> tableConfig = new HashMap<>();
+
+    // Store factory configuration
+    tableConfig.put(String.format(
+        StorageConfig.FACTORY(), tableSpec.getId()),
+        RocksDbKeyValueStorageEngineFactory.class.getName());
+
+    // Common store configuration
+    tableConfig.putAll(generateCommonStoreConfig(config));
+
+    // Rest of the configuration
+    tableSpec.getConfig().forEach((k, v) -> {
+      String realKey = k.startsWith("rocksdb.") ?
+          String.format("stores.%s", tableSpec.getId()) + "." + 
k.substring("rocksdb.".length())
+        : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + 
"." + k;
+      tableConfig.put(realKey, v);
+    });
+
+    logger.info("Generated configuration for table " + tableSpec.getId());
+
+    return tableConfig;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java
 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java
new file mode 100644
index 0000000..dbe0f97
--- /dev/null
+++ 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+
+public class RocksDbTableProviderFactory implements TableProviderFactory {
+  @Override
+  public TableProvider getTableProvider(TableSpec tableSpec) {
+    return new RocksDbTableProvider(tableSpec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
 
b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
new file mode 100644
index 0000000..49fe6eb
--- /dev/null
+++ 
b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+
+public class TestRocksDbTableDescriptor {
+
+  @Test
+  public void testMinimal() {
+    new RocksDbTableDescriptor<Integer, String>("1")
+        .validate();
+  }
+
+  @Test
+  public void testSerde() {
+    TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
+        .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+        .getTableSpec();
+    Assert.assertNotNull(tableSpec.getSerde());
+    Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), 
IntegerSerde.class);
+    Assert.assertEquals(tableSpec.getSerde().getValueSerde().getClass(), 
StringSerde.class);
+  }
+
+  @Test
+  public void testTableSpec() {
+
+    TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
+        .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+        .withBlockSize(1)
+        .withCacheSize(2)
+        .withCompactionStyle("fifo")
+        .withCompressionType("snappy")
+        .withMaxLogFileSize(3)
+        .withNumLogFilesToKeep(4)
+        .withNumWriteBuffers(5)
+        .withObjectCacheSize(6)
+        .withTtl(7)
+        .withWriteBatchSize(8)
+        .withWriteBufferSize(9)
+        .withConfig("rocksdb.abc", "xyz")
+        .getTableSpec();
+
+    Assert.assertNotNull(tableSpec.getSerde());
+    Assert.assertNotNull(tableSpec.getSerde().getKeySerde());
+    Assert.assertNotNull(tableSpec.getSerde().getValueSerde());
+    Assert.assertEquals("1", getConfig(tableSpec, 
RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES));
+    Assert.assertEquals("2", getConfig(tableSpec, 
RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES));
+    Assert.assertEquals("3", getConfig(tableSpec, 
RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES));
+    Assert.assertEquals("4", getConfig(tableSpec, 
RocksDbTableDescriptor.ROCKSDB_KEEP_LOG_FILE_NUM));
+    Assert.assertEquals("5", getConfig(tableSpec, 
RocksDbTableDescriptor.ROCKSDB_NUM_WRITE_BUFFERS));
+    Assert.assertEquals("6", getConfig(tableSpec, 
RocksDbTableDescriptor.OBJECT_CACHE_SIZE));
+    Assert.assertEquals("7", getConfig(tableSpec, 
RocksDbTableDescriptor.ROCKSDB_TTL_MS));
+    Assert.assertEquals("8", getConfig(tableSpec, 
RocksDbTableDescriptor.WRITE_BATCH_SIZE));
+    Assert.assertEquals("9", getConfig(tableSpec, 
RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES));
+    Assert.assertEquals("snappy", getConfig(tableSpec, 
RocksDbTableDescriptor.ROCKSDB_COMPRESSION));
+    Assert.assertEquals("fifo", getConfig(tableSpec, 
RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE));
+    Assert.assertEquals("xyz", getConfig(tableSpec, "abc"));
+  }
+
+  private String getConfig(TableSpec tableSpec, String key) {
+    return tableSpec.getConfig().get("rocksdb." + key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java
 
b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java
new file mode 100644
index 0000000..beda5da
--- /dev/null
+++ 
b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+
+public class TestRocksDbTableProvider {
+  @Test
+  public void testGenerateConfig() {
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    tableSpecConfig.put("rocksdb.c1", "c1-value");
+    tableSpecConfig.put("rocksdb.c2", "c2-value");
+    tableSpecConfig.put("c3", "c3-value");
+    tableSpecConfig.put("c4", "c4-value");
+
+    TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), 
new IntegerSerde()),
+        "my-table-provider-factory", tableSpecConfig);
+
+    Map<String, String> config = new HashMap<>();
+    config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+    config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+    TableProvider tableProvider = new RocksDbTableProvider(tableSpec);
+    Map<String, String> tableConfig = tableProvider.generateConfig(config);
+
+    Assert.assertEquals("ks1", 
tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+    Assert.assertEquals("vs1", 
tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+    Assert.assertEquals(
+        RocksDbKeyValueStorageEngineFactory.class.getName(),
+        tableConfig.get(String.format(StorageConfig.FACTORY(), "t1")));
+    Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1"));
+    Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2"));
+    Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3"));
+    Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
new file mode 100644
index 0000000..1f9b57b
--- /dev/null
+++ 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Map;
+
+import org.apache.samza.operators.BaseTableDescriptor;
+
+
+/**
+ * Table descriptor for store backed tables.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends 
BaseLocalStoreBackedTableDescriptor<K, V, D>>
+    extends BaseTableDescriptor<K, V, D> {
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table
+   */
+  public BaseLocalStoreBackedTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  @Override
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+    super.generateTableSpecConfig(tableSpecConfig);
+  }
+
+  /**
+   * Validate that this table descriptor is constructed properly
+   */
+  protected void validate() {
+    super.validate();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
new file mode 100644
index 0000000..4af0f1d
--- /dev/null
+++ 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.table.LocalStoreBackedTableProvider;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for tables backed by Samza stores, see {@link 
LocalStoreBackedTableProvider}.
+ */
+abstract public class BaseLocalStoreBackedTableProvider implements 
LocalStoreBackedTableProvider {
+
+  protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+  protected final TableSpec tableSpec;
+
+  protected KeyValueStore kvStore;
+
+  public BaseLocalStoreBackedTableProvider(TableSpec tableSpec) {
+    this.tableSpec = tableSpec;
+  }
+
+  @Override
+  public void init(StorageEngine store) {
+    kvStore = (KeyValueStore) store;
+    logger.info("Initialized backing store for table " + tableSpec.getId());
+  }
+
+  @Override
+  public Table getTable() {
+    if (kvStore == null) {
+      throw new SamzaException("Store not initialized for table " + 
tableSpec.getId());
+    }
+    return new LocalStoreBackedReadWriteTable(kvStore);
+  }
+
+  @Override
+  public void start() {
+    logger.info("Starting table provider for table " + tableSpec.getId());
+  }
+
+  @Override
+  public void stop() {
+    logger.info("Stopping table provider for table " + tableSpec.getId());
+  }
+
+  protected Map<String, String> generateCommonStoreConfig(Map<String, String> 
config) {
+
+    Map<String, String> storeConfig = new HashMap<>();
+
+    // We assume the configuration for serde are already generated for this 
table,
+    // so we simply carry them over to store configuration.
+    //
+    JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(config));
+
+    String keySerde = tableConfig.getKeySerde(tableSpec.getId());
+    storeConfig.put(String.format(StorageConfig.KEY_SERDE(), 
tableSpec.getId()), keySerde);
+
+    String valueSerde = tableConfig.getValueSerde(tableSpec.getId());
+    storeConfig.put(String.format(StorageConfig.MSG_SERDE(), 
tableSpec.getId()), valueSerde);
+
+    return storeConfig;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e74998c5/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
new file mode 100644
index 0000000..3149c86
--- /dev/null
+++ 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.List;
+
+import org.apache.samza.table.ReadWriteTable;
+
+
+/**
+ * A store backed readable and writable table
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class LocalStoreBackedReadWriteTable<K, V> extends 
LocalStoreBackedReadableTable<K, V>
+    implements ReadWriteTable<K, V> {
+
+  /**
+   * Constructs an instance of {@link LocalStoreBackedReadWriteTable}
+   * @param kvStore the backing store
+   */
+  public LocalStoreBackedReadWriteTable(KeyValueStore kvStore) {
+    super(kvStore);
+  }
+
+  @Override
+  public void put(K key, V value) {
+    kvStore.put(key, value);
+  }
+
+  @Override
+  public void putAll(List<Entry<K, V>> entries) {
+    entries.forEach(e -> kvStore.put(e.getKey(), e.getValue()));
+  }
+
+  @Override
+  public void delete(K key) {
+    kvStore.delete(key);
+  }
+
+  @Override
+  public void deleteAll(List<K> keys) {
+    keys.forEach(k -> kvStore.delete(k));
+  }
+
+  @Override
+  public void flush() {
+    kvStore.flush();
+  }
+
+}

Reply via email to