Repository: samza Updated Branches: refs/heads/master 12b6d3030 -> 479103b25
SAMZA-1875: Reuse table instances in TableManager We currently are invoking TableProvider.getTable() when TableManager.getTable(tableId) is invoked every time, this would in turn cause a new table instance being created. The assumption used so far is that end user would cache table instances, which is not always true. Therefore we should reuse table instance created earlier in TableManager. Author: Wei Song <[email protected]> Reviewers: Peng Du <[email protected]> Closes #636 from weisong44/fix_table_manager and squashes the following commits: 839bf76d [Wei Song] Added caching for table instances in TableManager 51562391 [Wei Song] Merge remote-tracking branch 'upstream/master' de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master' df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master' f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master' 4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master' 0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master' aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master' a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master' 5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master' 3f7ed71f [Wei Song] Added self to committer list Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/479103b2 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/479103b2 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/479103b2 Branch: refs/heads/master Commit: 479103b25da819341d2a23995bc888a55155df69 Parents: 12b6d30 Author: Wei Song <[email protected]> Authored: Tue Sep 11 21:25:06 2018 -0700 Committer: Wei Song <[email protected]> Committed: Tue Sep 11 21:25:06 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/table/TableManager.java | 34 ++++++++++++-------- .../apache/samza/table/TestTableManager.java | 11 ++++--- 2 files changed, 27 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/479103b2/samza-core/src/main/java/org/apache/samza/table/TableManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java index 186b4a8..ae72414 100644 --- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java +++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java @@ -37,12 +37,14 @@ import com.google.common.base.Preconditions; /** * A {@link TableManager} manages tables within a Samza task. For each table, it maintains - * the {@link TableSpec} and the {@link TableProvider}. It is used at execution for - * {@link org.apache.samza.container.TaskInstance} to retrieve table instances for - * read/write operations. + * the {@link TableSpec}, the {@link TableProvider} and the {@link Table} instance. + * It is used at execution for {@link org.apache.samza.container.TaskInstance} to retrieve + * table instances for read/write operations. * * A {@link TableManager} is constructed from job configuration, the {@link TableSpec} - * and {@link TableProvider} are constructed by processing the job configuration. + * and {@link TableProvider} are constructed by processing the job configuration + * during initialization. The {@link Table} is constructed when {@link #getTable(String)} + * is called and cached. * * After a {@link TableManager} is constructed, local tables are associated with * local store instances created during {@link org.apache.samza.container.SamzaContainer} @@ -51,19 +53,19 @@ import com.google.common.base.Preconditions; * Method {@link TableManager#getTable(String)} will throw {@link IllegalStateException}, * if it's called before initialization. * - * For store backed tables, the list of stores must be injected into the constructor. */ public class TableManager { static public class TableCtx { private TableSpec tableSpec; private TableProvider tableProvider; + private Table table; } private final Logger logger = LoggerFactory.getLogger(TableManager.class.getName()); // tableId -> TableCtx - private final Map<String, TableCtx> tables = new HashMap<>(); + private final Map<String, TableCtx> tableContexts = new HashMap<>(); private boolean initialized; @@ -100,7 +102,7 @@ public class TableManager { */ public void init(SamzaContainerContext containerContext, TaskContext taskContext) { Preconditions.checkNotNull(containerContext, "null container context."); - tables.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext)); + tableContexts.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext)); initialized = true; } @@ -109,7 +111,7 @@ public class TableManager { * @param tableSpec the table spec */ private void addTable(TableSpec tableSpec) { - if (tables.containsKey(tableSpec.getId())) { + if (tableContexts.containsKey(tableSpec.getId())) { throw new SamzaException("Table " + tableSpec.getId() + " already exists"); } TableCtx ctx = new TableCtx(); @@ -117,14 +119,14 @@ public class TableManager { Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class); ctx.tableProvider = tableProviderFactory.getTableProvider(tableSpec); ctx.tableSpec = tableSpec; - tables.put(tableSpec.getId(), ctx); + tableContexts.put(tableSpec.getId(), ctx); } /** * Shutdown the table manager, internally it shuts down all tables */ public void close() { - tables.values().forEach(ctx -> ctx.tableProvider.close()); + tableContexts.values().forEach(ctx -> ctx.tableProvider.close()); } /** @@ -133,10 +135,14 @@ public class TableManager { * @return table instance */ public Table getTable(String tableId) { - if (!initialized) { - throw new IllegalStateException("TableManager has not been initialized."); + Preconditions.checkState(initialized, "TableManager has not been initialized."); + + TableCtx ctx = tableContexts.get(tableId); + Preconditions.checkNotNull(ctx, "Unknown tableId " + tableId); + + if (ctx.table == null) { + ctx.table = ctx.tableProvider.getTable(); } - Preconditions.checkArgument(tables.containsKey(tableId), "Unknown tableId=" + tableId); - return tables.get(tableId).tableProvider.getTable(); + return ctx.table; } } http://git-wip-us.apache.org/repos/asf/samza/blob/479103b2/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java index 24178d0..42f05c0 100644 --- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java +++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java @@ -124,11 +124,14 @@ public class TestTableManager { TableManager tableManager = new TableManager(new MapConfig(map), serdeMap); tableManager.init(mock(SamzaContainerContext.class), mock(TaskContext.class)); - Table table = tableManager.getTable(TABLE_ID); - verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject(), anyObject()); - Assert.assertEquals(DummyTableProviderFactory.table, table); + for (int i = 0; i < 2; i++) { + Table table = tableManager.getTable(TABLE_ID); + verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject(), anyObject()); + verify(DummyTableProviderFactory.tableProvider, times(1)).getTable(); + Assert.assertEquals(DummyTableProviderFactory.table, table); + } - Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tables"); + Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tableContexts"); TableManager.TableCtx ctx = ctxMap.get(TABLE_ID); TableSpec tableSpec = getFieldValue(ctx, "tableSpec");
