Repository: tajo Updated Branches: refs/heads/master ab2efce8f -> 95cf4b943
TAJO-1213: Implement CatalogStore::updateTableStats. (jaehwa) Closes #285 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/95cf4b94 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/95cf4b94 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/95cf4b94 Branch: refs/heads/master Commit: 95cf4b9432a02fdbf9880b204c3db718e2bd2468 Parents: ab2efce Author: JaeHwa Jung <[email protected]> Authored: Sun Dec 7 15:38:43 2014 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Sun Dec 7 15:40:24 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/catalog/AbstractCatalogClient.java | 14 ++++ .../src/main/proto/CatalogProtocol.proto | 1 + .../org/apache/tajo/catalog/CatalogService.java | 5 ++ .../src/main/proto/CatalogProtos.proto | 5 ++ .../tajo/catalog/store/HCatalogStore.java | 6 ++ .../org/apache/tajo/catalog/CatalogServer.java | 22 +++++++ .../tajo/catalog/store/AbstractDBStore.java | 68 ++++++++++++++++++++ .../apache/tajo/catalog/store/CatalogStore.java | 4 +- .../org/apache/tajo/catalog/store/MemStore.java | 17 +++++ .../org/apache/tajo/master/GlobalEngine.java | 8 ++- .../apache/tajo/master/querymaster/Query.java | 9 ++- 12 files changed, 156 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 8ecd795..a59e107 100644 --- a/CHANGES +++ b/CHANGES @@ -18,6 +18,8 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-1213: Implement CatalogStore::updateTableStats. (jaehwa) + TAJO-1165: Needs to show error messages on query_executor.jsp. (Jihun Kang via jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index 1f1e808..dde6980 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -658,4 +658,18 @@ public abstract class AbstractCatalogClient implements CatalogService { } } + @Override + public boolean updateTableStats(final UpdateTableStatsProto updateTableStatsProto) { + try { + return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + public Boolean call(NettyClientBase client) throws ServiceException { + CatalogProtocolService.BlockingInterface stub = getStub(client); + return stub.updateTableStats(null, updateTableStatsProto).getValue(); + } + }.withRetries(); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return false; + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto index c5cb528..adf0740 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto +++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto @@ -33,6 +33,7 @@ service CatalogProtocolService { rpc getTablespace(StringProto) returns (TablespaceProto); rpc alterTablespace(AlterTablespaceProto) returns (BoolProto); rpc alterTable(AlterTableDescProto) returns (BoolProto); + rpc updateTableStats(UpdateTableStatsProto) returns (BoolProto); rpc createDatabase(CreateDatabaseRequest) returns (BoolProto); http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java index 667ee88..b41b636 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java @@ -27,6 +27,8 @@ import java.util.Collection; import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto; import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType; import static org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto; +import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; + public interface CatalogService { @@ -181,4 +183,7 @@ public interface CatalogService { * @throws Throwable */ boolean alterTable(AlterTableDesc desc); + + boolean updateTableStats(UpdateTableStatsProto stats); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index f29bc6c..22c08d8 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -280,6 +280,11 @@ message AlterColumnProto { required string newColumnName = 2; } +message UpdateTableStatsProto { + required string tableName = 1; + required TableStatsProto stats = 2; +} + //////////////////////////////////////////////// // Function and UDF Section //////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java index fa1cfd6..ad0aee3 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java @@ -333,6 +333,12 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore { } @Override + public void updateTableStats(CatalogProtos.UpdateTableStatsProto statsProto) throws + CatalogException { + // TODO - not implemented yet + } + + @Override public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto) throws CatalogException { throw new CatalogException("tablespace concept is not supported in HCatalogStore"); } http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index 03ae920..57086e2 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -58,6 +58,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand; import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.*; import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto; +import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; /** * This class provides the catalog service. The catalog service enables clients @@ -366,6 +367,27 @@ public class CatalogServer extends AbstractService { } @Override + public BoolProto updateTableStats(RpcController controller, UpdateTableStatsProto proto) throws + ServiceException { + wlock.lock(); + try { + String [] split = CatalogUtil.splitTableName(proto.getTableName()); + if (!store.existTable(split[0], split[1])) { + throw new NoSuchTableException(proto.getTableName()); + } + store.updateTableStats(proto); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + return BOOL_FALSE; + } finally { + wlock.unlock(); + LOG.info("Table " + proto.getTableName() + " is updated in the catalog (" + + bindAddressStr + ")"); + } + return BOOL_TRUE; + } + + @Override public BoolProto alterTable(RpcController controller, AlterTableDescProto proto) throws ServiceException { wlock.lock(); try { http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index 7c1baab..c7d55eb 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -823,6 +823,74 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo } @Override + public void updateTableStats(final CatalogProtos.UpdateTableStatsProto statsProto) throws + CatalogException { + Connection conn = null; + PreparedStatement pstmt = null; + ResultSet res = null; + + try { + conn = getConnection(); + conn.setAutoCommit(false); + + String[] splitted = CatalogUtil.splitTableName(statsProto.getTableName()); + if (splitted.length == 1) { + throw new IllegalArgumentException("updateTableStats() requires a qualified table name, but it is \"" + + statsProto.getTableName() + "\"."); + } + String databaseName = splitted[0]; + String tableName = splitted[1]; + + int dbid = getDatabaseId(databaseName); + + String tidSql = + "SELECT TID from " + TB_TABLES + " WHERE " + COL_DATABASES_PK + "=? AND " + COL_TABLES_NAME + "=?"; + pstmt = conn.prepareStatement(tidSql); + pstmt.setInt(1, dbid); + pstmt.setString(2, tableName); + res = pstmt.executeQuery(); + + if (!res.next()) { + throw new CatalogException("ERROR: there is no TID matched to " + statsProto.getTableName()); + } + + int tableId = res.getInt("TID"); + res.close(); + pstmt.close(); + + if (statsProto.hasStats()) { + + String statSql = "UPDATE " + TB_STATISTICS + " SET NUM_ROWS = ?, " + + "NUM_BYTES = ? WHERE TID = ?"; + + if (LOG.isDebugEnabled()) { + LOG.debug(statSql); + } + + pstmt = conn.prepareStatement(statSql); + pstmt.setInt(1, tableId); + pstmt.setLong(2, statsProto.getStats().getNumRows()); + pstmt.setLong(3, statsProto.getStats().getNumBytes()); + pstmt.executeUpdate(); + } + + // If there is no error, commit the changes. + conn.commit(); + } catch (SQLException se) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e) { + LOG.error(e); + } + } + throw new CatalogException(se); + } finally { + CatalogUtil.closeQuietly(pstmt, res); + } + } + + @Override public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) throws CatalogException { String[] splitted = CatalogUtil.splitTableName(alterTableDescProto.getTableName()); http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java index 5de9633..041fc52 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java @@ -68,7 +68,9 @@ public interface CatalogStore extends Closeable { void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) throws CatalogException; - /************************ PARTITION METHOD **************************/ + void updateTableStats(CatalogProtos.UpdateTableStatsProto statsProto) throws CatalogException; + + /************************ PARTITION METHOD **************************/ void addPartitionMethod(PartitionMethodProto partitionMethodProto) throws CatalogException; PartitionMethodProto getPartitionMethod(String databaseName, String tableName) http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java index ca99160..9575c13 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java @@ -172,6 +172,23 @@ public class MemStore implements CatalogStore { } @Override + public void updateTableStats(CatalogProtos.UpdateTableStatsProto request) throws CatalogException { + String [] splitted = CatalogUtil.splitTableName(request.getTableName()); + if (splitted.length == 1) { + throw new IllegalArgumentException("createTable() requires a qualified table name, but it is \"" + + request.getTableName() + "\"."); + } + String databaseName = splitted[0]; + String tableName = splitted[1]; + + final Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, databaseName); + final CatalogProtos.TableDescProto tableDescProto = database.get(tableName); + CatalogProtos.TableDescProto newTableDescProto = tableDescProto.toBuilder().setStats(request + .getStats().toBuilder()).build(); + database.put(tableName, newTableDescProto); + } + + @Override public boolean existTable(String dbName, String tbName) throws CatalogException { Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, dbName); http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 9bf9a75..821d440 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -74,6 +74,7 @@ import java.util.List; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto; +import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet; import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; @@ -398,8 +399,11 @@ public class GlobalEngine extends AbstractService { stats.setNumBytes(volume); stats.setNumRows(1); - catalog.dropTable(insertNode.getTableName()); - catalog.createTable(tableDesc); + UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder(); + builder.setTableName(tableDesc.getName()); + builder.setStats(stats.getProto()); + + catalog.updateTableStats(builder.build()); responseBuilder.setTableDesc(tableDesc.getProto()); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 6f80171..f92001f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -34,6 +34,8 @@ import org.apache.tajo.QueryId; import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos.QueryState; +import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; @@ -916,8 +918,11 @@ public class Query implements EventHandler<QueryEvent> { finalTable.setStats(stats); if (insertNode.hasTargetTable()) { - catalog.dropTable(insertNode.getTableName()); - catalog.createTable(finalTable); + UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder(); + builder.setTableName(finalTable.getName()); + builder.setStats(stats.getProto()); + + catalog.updateTableStats(builder.build()); } query.setResultDesc(finalTable);
