Repository: samza
Updated Branches:
  refs/heads/master 12b6d3030 -> 479103b25


SAMZA-1875: Reuse table instances in TableManager

We currently are invoking TableProvider.getTable() when 
TableManager.getTable(tableId) is invoked every time, this would in turn cause 
a new table instance being created. The assumption used so far is that end user 
would cache table instances, which is not always true. Therefore we should 
reuse table instance created earlier in TableManager.

Author: Wei Song <[email protected]>

Reviewers: Peng Du <[email protected]>

Closes #636 from weisong44/fix_table_manager and squashes the following commits:

839bf76d [Wei Song] Added caching for table instances in TableManager
51562391 [Wei Song] Merge remote-tracking branch 'upstream/master'
de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master'
df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master'
f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master'
4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master'
0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master'
aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master'
a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master'
5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master'
3f7ed71f [Wei Song] Added self to committer list


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/479103b2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/479103b2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/479103b2

Branch: refs/heads/master
Commit: 479103b25da819341d2a23995bc888a55155df69
Parents: 12b6d30
Author: Wei Song <[email protected]>
Authored: Tue Sep 11 21:25:06 2018 -0700
Committer: Wei Song <[email protected]>
Committed: Tue Sep 11 21:25:06 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/table/TableManager.java    | 34 ++++++++++++--------
 .../apache/samza/table/TestTableManager.java    | 11 ++++---
 2 files changed, 27 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/479103b2/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java 
b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index 186b4a8..ae72414 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -37,12 +37,14 @@ import com.google.common.base.Preconditions;
 
 /**
  * A {@link TableManager} manages tables within a Samza task. For each table, 
it maintains
- * the {@link TableSpec} and the {@link TableProvider}. It is used at 
execution for
- * {@link org.apache.samza.container.TaskInstance} to retrieve table instances 
for
- * read/write operations.
+ * the {@link TableSpec}, the {@link TableProvider} and the {@link Table} 
instance.
+ * It is used at execution for {@link org.apache.samza.container.TaskInstance} 
to retrieve
+ * table instances for read/write operations.
  *
  * A {@link TableManager} is constructed from job configuration, the {@link 
TableSpec}
- * and {@link TableProvider} are constructed by processing the job 
configuration.
+ * and {@link TableProvider} are constructed by processing the job 
configuration
+ * during initialization. The {@link Table} is constructed when {@link 
#getTable(String)}
+ * is called and cached.
  *
  * After a {@link TableManager} is constructed, local tables are associated 
with
  * local store instances created during {@link 
org.apache.samza.container.SamzaContainer}
@@ -51,19 +53,19 @@ import com.google.common.base.Preconditions;
  * Method {@link TableManager#getTable(String)} will throw {@link 
IllegalStateException},
  * if it's called before initialization.
  *
- * For store backed tables, the list of stores must be injected into the 
constructor.
  */
 public class TableManager {
 
   static public class TableCtx {
     private TableSpec tableSpec;
     private TableProvider tableProvider;
+    private Table table;
   }
 
   private final Logger logger = 
LoggerFactory.getLogger(TableManager.class.getName());
 
   // tableId -> TableCtx
-  private final Map<String, TableCtx> tables = new HashMap<>();
+  private final Map<String, TableCtx> tableContexts = new HashMap<>();
 
   private boolean initialized;
 
@@ -100,7 +102,7 @@ public class TableManager {
    */
   public void init(SamzaContainerContext containerContext, TaskContext 
taskContext) {
     Preconditions.checkNotNull(containerContext, "null container context.");
-    tables.values().forEach(ctx -> ctx.tableProvider.init(containerContext, 
taskContext));
+    tableContexts.values().forEach(ctx -> 
ctx.tableProvider.init(containerContext, taskContext));
     initialized = true;
   }
 
@@ -109,7 +111,7 @@ public class TableManager {
    * @param tableSpec the table spec
    */
   private void addTable(TableSpec tableSpec) {
-    if (tables.containsKey(tableSpec.getId())) {
+    if (tableContexts.containsKey(tableSpec.getId())) {
       throw new SamzaException("Table " + tableSpec.getId() + " already 
exists");
     }
     TableCtx ctx = new TableCtx();
@@ -117,14 +119,14 @@ public class TableManager {
         Util.getObj(tableSpec.getTableProviderFactoryClassName(), 
TableProviderFactory.class);
     ctx.tableProvider = tableProviderFactory.getTableProvider(tableSpec);
     ctx.tableSpec = tableSpec;
-    tables.put(tableSpec.getId(), ctx);
+    tableContexts.put(tableSpec.getId(), ctx);
   }
 
   /**
    * Shutdown the table manager, internally it shuts down all tables
    */
   public void close() {
-    tables.values().forEach(ctx -> ctx.tableProvider.close());
+    tableContexts.values().forEach(ctx -> ctx.tableProvider.close());
   }
 
   /**
@@ -133,10 +135,14 @@ public class TableManager {
    * @return table instance
    */
   public Table getTable(String tableId) {
-    if (!initialized) {
-      throw new IllegalStateException("TableManager has not been 
initialized.");
+    Preconditions.checkState(initialized, "TableManager has not been 
initialized.");
+
+    TableCtx ctx = tableContexts.get(tableId);
+    Preconditions.checkNotNull(ctx, "Unknown tableId " + tableId);
+
+    if (ctx.table == null) {
+      ctx.table = ctx.tableProvider.getTable();
     }
-    Preconditions.checkArgument(tables.containsKey(tableId), "Unknown 
tableId=" + tableId);
-    return tables.get(tableId).tableProvider.getTable();
+    return ctx.table;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/479103b2/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java 
b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index 24178d0..42f05c0 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -124,11 +124,14 @@ public class TestTableManager {
     TableManager tableManager = new TableManager(new MapConfig(map), serdeMap);
     tableManager.init(mock(SamzaContainerContext.class), 
mock(TaskContext.class));
 
-    Table table = tableManager.getTable(TABLE_ID);
-    verify(DummyTableProviderFactory.tableProvider, 
times(1)).init(anyObject(), anyObject());
-    Assert.assertEquals(DummyTableProviderFactory.table, table);
+    for (int i = 0; i < 2; i++) {
+      Table table = tableManager.getTable(TABLE_ID);
+      verify(DummyTableProviderFactory.tableProvider, 
times(1)).init(anyObject(), anyObject());
+      verify(DummyTableProviderFactory.tableProvider, times(1)).getTable();
+      Assert.assertEquals(DummyTableProviderFactory.table, table);
+    }
 
-    Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, 
"tables");
+    Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, 
"tableContexts");
     TableManager.TableCtx ctx = ctxMap.get(TABLE_ID);
 
     TableSpec tableSpec = getFieldValue(ctx, "tableSpec");

Reply via email to