Repository: incubator-impala Updated Branches: refs/heads/master 13455b5a2 -> 838c1f544
IMPALA-1702: Enforce single-table consistency in query analysis. Catalogd managed table id generation causes problems when new updates arrive at frontend while queries are being analyzed: references to the same table may suddenly refer to a different version, and different tables may share the same table id. This causes problems when one table overrides another one in the thrift descriptor table sent to backend. This commit removes the table id from the catalog Table object; instead frontend assigns a unique id to each table in DescriptorTable.toThrift(). It also implements a referencedTables_ cache in Analyzer::globalState_ so that calling Analyzer::getTable() on the same table returns the same reference for the same query. Change-Id: Ifad648b72684ae495ec387590ab1bc58ce5b39e2 Reviewed-on: http://gerrit.cloudera.org:8080/4349 Tested-by: Internal Jenkins Reviewed-by: Alex Behm <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/01e7b110 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/01e7b110 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/01e7b110 Branch: refs/heads/master Commit: 01e7b1101523a668b5959462490918ae1a312452 Parents: 13455b5 Author: Huaisi Xu <[email protected]> Authored: Thu Sep 1 17:26:02 2016 -0700 Committer: Alex Behm <[email protected]> Committed: Tue Oct 25 21:56:11 2016 +0000 ---------------------------------------------------------------------- common/thrift/CatalogObjects.thrift | 23 +++--- common/thrift/Descriptors.thrift | 1 + .../org/apache/impala/analysis/Analyzer.java | 19 ++++- .../analysis/CreateTableAsSelectStmt.java | 9 +-- .../apache/impala/analysis/DescriptorTable.java | 80 ++++++++++++-------- .../org/apache/impala/analysis/InsertStmt.java | 2 +- .../org/apache/impala/analysis/ModifyStmt.java | 2 + .../apache/impala/analysis/TupleDescriptor.java | 12 ++- .../impala/catalog/CatalogServiceCatalog.java | 18 +---- .../apache/impala/catalog/DataSourceTable.java | 9 +-- .../org/apache/impala/catalog/HBaseTable.java | 8 +- .../org/apache/impala/catalog/HdfsTable.java | 8 +- .../apache/impala/catalog/IncompleteTable.java | 18 ++--- .../org/apache/impala/catalog/KuduTable.java | 11 ++- .../java/org/apache/impala/catalog/Table.java | 31 +++----- .../java/org/apache/impala/catalog/TableId.java | 42 ---------- .../org/apache/impala/catalog/TableLoader.java | 9 +-- .../apache/impala/catalog/TableLoadingMgr.java | 4 +- .../java/org/apache/impala/catalog/View.java | 12 +-- .../main/java/org/apache/impala/common/Id.java | 2 - .../apache/impala/planner/HBaseTableSink.java | 3 +- .../apache/impala/planner/HdfsTableSink.java | 3 +- .../org/apache/impala/planner/JoinTableId.java | 6 +- .../apache/impala/planner/KuduTableSink.java | 3 +- .../org/apache/impala/service/Frontend.java | 33 +------- .../apache/impala/common/FrontendTestBase.java | 4 +- .../apache/impala/planner/PlannerTestBase.java | 49 ++++++++++++ 27 files changed, 203 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/common/thrift/CatalogObjects.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 78aa19d..b090aee 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -399,39 +399,36 @@ struct TTable { // string pointing to where the metadata loading error occurred. 3: optional Status.TStatus load_status - // Table identifier. - 4: optional Types.TTableId id - // The access level Impala has on this table (READ_WRITE, READ_ONLY, etc). - 5: optional TAccessLevel access_level + 4: optional TAccessLevel access_level // List of columns (excludes clustering columns) - 6: optional list<TColumn> columns + 5: optional list<TColumn> columns // List of clustering columns (empty list if table has no clustering columns) - 7: optional list<TColumn> clustering_columns + 6: optional list<TColumn> clustering_columns // Table stats data for the table. - 8: optional TTableStats table_stats + 7: optional TTableStats table_stats // Determines the table type - either HDFS, HBASE, or VIEW. - 9: optional TTableType table_type + 8: optional TTableType table_type // Set iff this is an HDFS table - 10: optional THdfsTable hdfs_table + 9: optional THdfsTable hdfs_table // Set iff this is an Hbase table - 11: optional THBaseTable hbase_table + 10: optional THBaseTable hbase_table // The Hive Metastore representation of this table. May not be set if there were // errors loading the table metadata - 12: optional hive_metastore.Table metastore_table + 11: optional hive_metastore.Table metastore_table // Set iff this is a table from an external data source - 13: optional TDataSourceTable data_source_table + 12: optional TDataSourceTable data_source_table // Set iff this a kudu table - 14: optional TKuduTable kudu_table + 13: optional TKuduTable kudu_table } // Represents a database. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/common/thrift/Descriptors.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Descriptors.thrift b/common/thrift/Descriptors.thrift index 1077bb6..c3f1397 100644 --- a/common/thrift/Descriptors.thrift +++ b/common/thrift/Descriptors.thrift @@ -58,6 +58,7 @@ struct TColumnDescriptor { // "Union" of all table types. struct TTableDescriptor { + // Query local id assigned in DescriptorTable:toThrift() 1: required Types.TTableId id 2: required CatalogObjects.TTableType tableType // Clustering/partition columns come first. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/analysis/Analyzer.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index f9909b1..b630a6e 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; @@ -290,6 +291,11 @@ public class Analyzer { // Decreases the size of the scan range locations. private final ListMap<TNetworkAddress> hostIndex = new ListMap<TNetworkAddress>(); + // The Impalad Catalog has the latest tables from the statestore. In order to use the + // same version of a table in a single query, we cache all referenced tables here. + // TODO: Investigate what to do with other catalog objects. + private final HashMap<TableName, Table> referencedTables_ = Maps.newHashMap(); + // Timeline of important events in the planning process, used for debugging / // profiling private final EventSequence timeline = new EventSequence("Planner Timeline"); @@ -2277,7 +2283,9 @@ public class Analyzer { } /** - * Returns the Catalog Table object for the given database and table name. + * Returns the Catalog Table object for the given database and table name. A table + * referenced for the first time is cached in globalState_.referencedTables_. The same + * table instance is returned for all subsequent references in the same query. * Adds the table to this analyzer's "missingTbls_" and throws an AnalysisException if * the table has not yet been loaded in the local catalog cache. * Throws an AnalysisException if the table or the db does not exist in the Catalog. @@ -2285,7 +2293,13 @@ public class Analyzer { */ public Table getTable(String dbName, String tableName) throws AnalysisException, TableLoadingException { - Table table = null; + TableName tblName = new TableName(dbName, tableName); + Table table = globalState_.referencedTables_.get(tblName); + if (table != null) { + // Return query-local version of table. + Preconditions.checkState(table.isLoaded()); + return table; + } try { table = getCatalog().getTable(dbName, tableName); } catch (DatabaseNotFoundException e) { @@ -2307,6 +2321,7 @@ public class Analyzer { throw new AnalysisException( "Table/view is missing metadata: " + table.getFullName()); } + globalState_.referencedTables_.put(tblName, table); return table; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java index 816af80..f7f8417 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java @@ -26,7 +26,6 @@ import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.Table; -import org.apache.impala.catalog.TableId; import org.apache.impala.common.AnalysisException; import org.apache.impala.service.CatalogOpExecutor; import org.apache.impala.thrift.THdfsFileFormat; @@ -187,12 +186,6 @@ public class CreateTableAsSelectStmt extends StatementBase { // user specified a location for the table this will be a no-op. msTbl.getSd().setLocation(analyzer.getCatalog().getTablePath(msTbl).toString()); - // Create a "temp" table based off the given metastore.api.Table object. Normally, - // the CatalogService assigns all table IDs, but in this case we need to assign the - // "temp" table an ID locally. This table ID cannot conflict with any table in the - // SelectStmt (or the BE will be very confused). To ensure the ID is unique within - // this query, just assign it the invalid table ID. The CatalogServer will assign - // this table a proper ID once it is created there as part of the CTAS execution. Table tmpTable = null; if (KuduTable.isKuduTable(msTbl)) { tmpTable = KuduTable.createCtasTarget(db, msTbl, createStmt_.getColumnDefs(), @@ -200,7 +193,7 @@ public class CreateTableAsSelectStmt extends StatementBase { } else { // TODO: Creating a tmp table using load() is confusing. // Refactor it to use a 'createCtasTarget()' function similar to Kudu table. - tmpTable = Table.fromMetastoreTable(TableId.createInvalidId(), db, msTbl); + tmpTable = Table.fromMetastoreTable(db, msTbl); tmpTable.load(true, client.getHiveClient(), msTbl); } Preconditions.checkState(tmpTable != null && http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java index 22764ea..7040be6 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java +++ b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java @@ -36,7 +36,6 @@ import org.apache.impala.thrift.TDescriptorTable; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; /** * Repository for tuple (and slot) descriptors. @@ -48,12 +47,17 @@ public class DescriptorTable { private final HashMap<SlotId, SlotDescriptor> slotDescs_ = Maps.newHashMap(); private final IdGenerator<TupleId> tupleIdGenerator_ = TupleId.createGenerator(); private final IdGenerator<SlotId> slotIdGenerator_ = SlotId.createGenerator(); - // List of referenced tables with no associated TupleDescriptor to ship to the BE. - // For example, the output table of an insert query. - private final List<Table> referencedTables_ = Lists.newArrayList(); + // The target table of a table sink, may be null. + // Table id 0 is reserved for it. Set in QueryStmt.analyze() that produces a table sink, + // e.g. InsertStmt.analyze(), ModifyStmt.analyze(). + private Table targetTable_; // For each table, the set of partitions that are referenced by at least one scan range. private final HashMap<Table, HashSet<Long>> referencedPartitionsPerTable_ = Maps.newHashMap(); + // 0 is reserved for table sinks + public static final int TABLE_SINK_ID = 0; + // Table id counter for a single query. + private int nextTableId_ = TABLE_SINK_ID + 1; public TupleDescriptor createTupleDescriptor(String debugName) { TupleDescriptor d = new TupleDescriptor(tupleIdGenerator_.getNextId(), debugName); @@ -102,9 +106,8 @@ public class DescriptorTable { public TupleId getMaxTupleId() { return tupleIdGenerator_.getMaxId(); } public SlotId getMaxSlotId() { return slotIdGenerator_.getMaxId(); } - public void addReferencedTable(Table table) { - referencedTables_.add(table); - } + public Table getTargetTable() { return targetTable_; } + public void setTargetTable(Table table) { targetTable_ = table; } /** * Find the set of referenced partitions for the given table. Allocates a set if @@ -156,38 +159,55 @@ public class DescriptorTable { for (TupleDescriptor d: tupleDescs_.values()) d.computeMemLayout(); } + /** + * Returns the thrift representation of this DescriptorTable. Assign unique ids to all + * distinct tables and set them in tuple descriptors as necessary. + */ public TDescriptorTable toThrift() { TDescriptorTable result = new TDescriptorTable(); - HashSet<Table> referencedTbls = Sets.newHashSet(); - HashSet<Table> allPartitionsTbls = Sets.newHashSet(); + // Maps from base table to its table id used in the backend. + HashMap<Table, Integer> tableIdMap = Maps.newHashMap(); + // Used to check table level consistency + HashMap<TableName, Table> referencedTables = Maps.newHashMap(); + + if (targetTable_ != null) { + tableIdMap.put(targetTable_, TABLE_SINK_ID); + referencedTables.put(targetTable_.getTableName(), targetTable_); + } for (TupleDescriptor tupleDesc: tupleDescs_.values()) { // inline view of a non-constant select has a non-materialized tuple descriptor // in the descriptor table just for type checking, which we need to skip - if (tupleDesc.isMaterialized()) { - // TODO: Ideally, we should call tupleDesc.checkIsExecutable() here, but there - // currently are several situations in which we send materialized tuples without - // a mem layout to the BE, e.g., when unnesting unions or when replacing plan - // trees with an EmptySetNode. - result.addToTupleDescriptors(tupleDesc.toThrift()); - Table table = tupleDesc.getTable(); - if (table != null && !(table instanceof View)) referencedTbls.add(table); - // Only serialize materialized slots - for (SlotDescriptor slotD: tupleDesc.getMaterializedSlots()) { - result.addToSlotDescriptors(slotD.toThrift()); + if (!tupleDesc.isMaterialized()) continue; + Table table = tupleDesc.getTable(); + Integer tableId = tableIdMap.get(table); + if (table != null && !(table instanceof View)) { + TableName tblName = table.getTableName(); + // Verify table level consistency in the same query by checking that references to + // the same Table refer to the same table instance. + Table checkTable = referencedTables.get(tblName); + Preconditions.checkState(checkTable == null || table == checkTable); + if (tableId == null) { + tableId = nextTableId_++; + tableIdMap.put(table, tableId); + referencedTables.put(tblName, table); } } + // TODO: Ideally, we should call tupleDesc.checkIsExecutable() here, but there + // currently are several situations in which we send materialized tuples without + // a mem layout to the BE, e.g., when unnesting unions or when replacing plan + // trees with an EmptySetNode. + result.addToTupleDescriptors(tupleDesc.toThrift(tableId)); + // Only serialize materialized slots + for (SlotDescriptor slotD: tupleDesc.getMaterializedSlots()) { + result.addToSlotDescriptors(slotD.toThrift()); + } } - for (Table table: referencedTables_) { - referencedTbls.add(table); - // We don't know which partitions are needed for INSERT, so include them all. - allPartitionsTbls.add(table); - } - for (Table tbl: referencedTbls) { + for (Table tbl: tableIdMap.keySet()) { HashSet<Long> referencedPartitions = null; // null means include all partitions. - if (!allPartitionsTbls.contains(tbl)) { - referencedPartitions = getReferencedPartitions(tbl); - } - result.addToTableDescriptors(tbl.toThriftDescriptor(referencedPartitions)); + // We don't know which partitions are needed for INSERT, so do not prune partitions. + if (tbl != targetTable_) referencedPartitions = getReferencedPartitions(tbl); + result.addToTableDescriptors( + tbl.toThriftDescriptor(tableIdMap.get(tbl), referencedPartitions)); } return result; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java index 250d2d2..b80fcde 100644 --- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java @@ -421,7 +421,7 @@ public class InsertStmt extends StatementBase { } // Add target table to descriptor table. - analyzer.getDescTbl().addReferencedTable(table_); + analyzer.getDescTbl().setTargetTable(table_); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java index 28de1a8..be22f02 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java @@ -158,6 +158,8 @@ public abstract class ModifyStmt extends StatementBase { // Validates the assignments_ and creates the sourceStmt_. if (sourceStmt_ == null) createSourceStmt(analyzer); sourceStmt_.analyze(analyzer); + // Add target table to descriptor table. + analyzer.getDescTbl().setTargetTable(table_); } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java index e5462fd..5fbe5f6 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java +++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java @@ -204,15 +204,13 @@ public class TupleDescriptor { } } - public TTupleDescriptor toThrift() { + public TTupleDescriptor toThrift(Integer tableId) { TTupleDescriptor ttupleDesc = new TTupleDescriptor(id_.asInt(), byteSize_, numNullBytes_); - // do not set the table id or tuple path for views - if (getTable() != null && !(getTable() instanceof View)) { - ttupleDesc.setTableId(getTable().getId().asInt()); - Preconditions.checkNotNull(path_); - ttupleDesc.setTuplePath(path_.getAbsolutePath()); - } + if (tableId == null) return ttupleDesc; + ttupleDesc.setTableId(tableId); + Preconditions.checkNotNull(path_); + ttupleDesc.setTuplePath(path_.getAbsolutePath()); return ttupleDesc; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 149b00b..10f6e4c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.UUID; @@ -132,8 +131,6 @@ public class CatalogServiceCatalog extends Catalog { // sequence number assigned to catalog objects. private long catalogVersion_ = INITIAL_CATALOG_VERSION; - protected final AtomicInteger nextTableId_ = new AtomicInteger(0); - // Manages the scheduling of background table loading. private final TableLoadingMgr tableLoadingMgr_; @@ -591,8 +588,7 @@ public class CatalogServiceCatalog extends Catalog { List<TTableName> tblsToBackgroundLoad = Lists.newArrayList(); for (String tableName: msClient.getHiveClient().getAllTables(dbName)) { - Table incompleteTbl = IncompleteTable.createUninitializedTable( - getNextTableId(), newDb, tableName); + Table incompleteTbl = IncompleteTable.createUninitializedTable(newDb, tableName); incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion()); newDb.addTable(incompleteTbl); if (loadInBackground_) { @@ -624,8 +620,6 @@ public class CatalogServiceCatalog extends Catalog { catalogLock_.writeLock().lock(); try { - nextTableId_.set(0); - // Not all Java UDFs are persisted to the metastore. The ones which aren't // should be restored once the catalog has been invalidated. Map<String, Db> oldDbCache = dbCache_.get(); @@ -691,8 +685,7 @@ public class CatalogServiceCatalog extends Catalog { public Table addTable(String dbName, String tblName) { Db db = getDb(dbName); if (db == null) return null; - Table incompleteTable = - IncompleteTable.createUninitializedTable(getNextTableId(), db, tblName); + Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName); incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion()); db.addTable(incompleteTable); return db.getTable(tblName); @@ -1046,8 +1039,7 @@ public class CatalogServiceCatalog extends Catalog { // Add a new uninitialized table to the table cache, effectively invalidating // any existing entry. The metadata for the table will be loaded lazily, on the // on the next access to the table. - Table newTable = IncompleteTable.createUninitializedTable( - getNextTableId(), db, tblName); + Table newTable = IncompleteTable.createUninitializedTable(db, tblName); newTable.setCatalogVersion(incrementAndGetCatalogVersion()); db.addTable(newTable); if (loadInBackground_) { @@ -1209,10 +1201,6 @@ public class CatalogServiceCatalog extends Catalog { public ReentrantReadWriteLock getLock() { return catalogLock_; } - /** - * Gets the next table ID and increments the table ID counter. - */ - public TableId getNextTableId() { return new TableId(nextTableId_.getAndIncrement()); } public SentryProxy getSentryProxy() { return sentryProxy_; } public AuthorizationPolicy getAuthPolicy() { return authPolicy_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java index 25fe10d..dab0c05 100644 --- a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java @@ -77,10 +77,9 @@ public class DataSourceTable extends Table { private String initString_; private TDataSource dataSource_; - protected DataSourceTable( - TableId id, org.apache.hadoop.hive.metastore.api.Table msTable, + protected DataSourceTable(org.apache.hadoop.hive.metastore.api.Table msTable, Db db, String name, String owner) { - super(id, msTable, db, name, owner); + super(msTable, db, name, owner); } /** @@ -221,8 +220,8 @@ public class DataSourceTable extends Table { } @Override - public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) { - TTableDescriptor tableDesc = new TTableDescriptor(id_.asInt(), + public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) { + TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.DATA_SOURCE_TABLE, getTColumnDescriptors(), numClusteringCols_, name_, db_.getName()); tableDesc.setDataSourceTable(getDataSourceTable()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java index 2d1fc78..3af1160 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java @@ -129,9 +129,9 @@ public class HBaseTable extends Table { // (see IMPALA-4211). private HColumnDescriptor[] columnFamilies_ = null; - protected HBaseTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTbl, + protected HBaseTable(org.apache.hadoop.hive.metastore.api.Table msTbl, Db db, String name, String owner) { - super(id, msTbl, db, name, owner); + super(msTbl, db, name, owner); } /** @@ -662,9 +662,9 @@ public class HBaseTable extends Table { } @Override - public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) { + public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) { TTableDescriptor tableDescriptor = - new TTableDescriptor(id_.asInt(), TTableType.HBASE_TABLE, + new TTableDescriptor(tableId, TTableType.HBASE_TABLE, getTColumnDescriptors(), numClusteringCols_, hbaseTableName_, db_.getName()); tableDescriptor.setHbaseTable(getTHBaseTable()); return tableDescriptor; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index e6206fc..97831d7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -266,9 +266,9 @@ public class HdfsTable extends Table { } } - public HdfsTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTbl, + public HdfsTable(org.apache.hadoop.hive.metastore.api.Table msTbl, Db db, String name, String owner) { - super(id, msTbl, db, name, owner); + super(msTbl, db, name, owner); partitionLocationCompressor_ = new HdfsPartitionLocationCompressor(numClusteringCols_); } @@ -1567,10 +1567,10 @@ public class HdfsTable extends Table { } @Override - public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) { + public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) { // Create thrift descriptors to send to the BE. The BE does not // need any information below the THdfsPartition level. - TTableDescriptor tableDesc = new TTableDescriptor(id_.asInt(), TTableType.HDFS_TABLE, + TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE, getTColumnDescriptors(), numClusteringCols_, name_, db_.getName()); tableDesc.setHdfsTable(getTHdfsTable(false, referencedPartitions)); return tableDesc; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java index ec42036..0cf89ab 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java @@ -43,9 +43,9 @@ public class IncompleteTable extends Table { // its metadata loaded). private ImpalaException cause_; - private IncompleteTable(TableId id, Db db, String name, + private IncompleteTable(Db db, String name, ImpalaException cause) { - super(id, null, db, name, null); + super(null, db, name, null); cause_ = cause; } @@ -65,7 +65,7 @@ public class IncompleteTable extends Table { public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; } @Override - public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) { + public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) { throw new IllegalStateException(cause_); } @@ -82,7 +82,6 @@ public class IncompleteTable extends Table { @Override public TTable toThrift() { TTable table = new TTable(db_.getName(), name_); - table.setId(id_.asInt()); if (cause_ != null) { table.setLoad_status(new TStatus(TErrorCode.INTERNAL_ERROR, Lists.newArrayList(JniUtil.throwableToString(cause_), @@ -122,13 +121,12 @@ public class IncompleteTable extends Table { } } - public static IncompleteTable createUninitializedTable(TableId id, Db db, - String name) { - return new IncompleteTable(id, db, name, null); + public static IncompleteTable createUninitializedTable(Db db, String name) { + return new IncompleteTable(db, name, null); } - public static IncompleteTable createFailedMetadataLoadTable(TableId id, Db db, - String name, ImpalaException e) { - return new IncompleteTable(id, db, name, e); + public static IncompleteTable createFailedMetadataLoadTable(Db db, String name, + ImpalaException e) { + return new IncompleteTable(db, name, e); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/catalog/KuduTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java index d0185b7..7d78715 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java @@ -115,9 +115,9 @@ public class KuduTable extends Table { // supported. private final List<DistributeParam> distributeBy_ = Lists.newArrayList(); - protected KuduTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTable, + protected KuduTable(org.apache.hadoop.hive.metastore.api.Table msTable, Db db, String name, String owner) { - super(id, msTable, db, name, owner); + super(msTable, db, name, owner); kuduTableName_ = msTable.getParameters().get(KuduTable.KEY_TABLE_NAME); kuduMasters_ = msTable.getParameters().get(KuduTable.KEY_MASTER_HOSTS); } @@ -261,8 +261,7 @@ public class KuduTable extends Table { public static KuduTable createCtasTarget(Db db, org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs, List<String> primaryKeyColumnNames, List<DistributeParam> distributeParams) { - KuduTable tmpTable = new KuduTable(TableId.createInvalidId(), msTbl, db, - msTbl.getTableName(), msTbl.getOwner()); + KuduTable tmpTable = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); int pos = 0; for (ColumnDef colDef: columnDefs) { tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++)); @@ -308,8 +307,8 @@ public class KuduTable extends Table { } @Override - public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) { - TTableDescriptor desc = new TTableDescriptor(id_.asInt(), TTableType.KUDU_TABLE, + public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) { + TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.KUDU_TABLE, getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName()); desc.setKuduTable(getTKuduTable()); return desc; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/catalog/Table.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 4b40b44..bef381a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -59,13 +59,9 @@ import com.google.common.collect.Maps; public abstract class Table implements CatalogObject { private static final Logger LOG = Logger.getLogger(Table.class); - // Lock used to serialize calls to the Hive MetaStore to work around MetaStore - // concurrency bugs. Currently used to serialize calls to "getTable()" due to HIVE-5457. - private static final Object metastoreAccessLock_ = new Object(); private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; protected org.apache.hadoop.hive.metastore.api.Table msTable_; - protected final TableId id_; protected final Db db_; protected final String name_; protected final String owner_; @@ -95,9 +91,8 @@ public abstract class Table implements CatalogObject { protected static EnumSet<TableType> SUPPORTED_TABLE_TYPES = EnumSet.of( TableType.EXTERNAL_TABLE, TableType.MANAGED_TABLE, TableType.VIRTUAL_VIEW); - protected Table(TableId id, org.apache.hadoop.hive.metastore.api.Table msTable, Db db, + protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db, String name, String owner) { - id_ = id; msTable_ = msTable; db_ = db; name_ = name.toLowerCase(); @@ -106,7 +101,8 @@ public abstract class Table implements CatalogObject { CatalogServiceCatalog.getLastDdlTime(msTable_) : -1; } - public abstract TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions); + public abstract TTableDescriptor toThriftDescriptor( + int tableId, Set<Long> referencedPartitions); public abstract TCatalogObjectType getCatalogObjectType(); /** @@ -209,24 +205,24 @@ public abstract class Table implements CatalogObject { * Creates a table of the appropriate type based on the given hive.metastore.api.Table * object. */ - public static Table fromMetastoreTable(TableId id, Db db, + public static Table fromMetastoreTable(Db db, org.apache.hadoop.hive.metastore.api.Table msTbl) { // Create a table of appropriate type Table table = null; if (TableType.valueOf(msTbl.getTableType()) == TableType.VIRTUAL_VIEW) { - table = new View(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner()); + table = new View(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); } else if (HBaseTable.isHBaseTable(msTbl)) { - table = new HBaseTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner()); + table = new HBaseTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); } else if (KuduTable.isKuduTable(msTbl)) { - table = new KuduTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner()); + table = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); } else if (DataSourceTable.isDataSourceTable(msTbl)) { // It's important to check if this is a DataSourceTable before HdfsTable because // DataSourceTables are still represented by HDFS tables in the metastore but // have a special table property to indicate that Impala should use an external // data source. - table = new DataSourceTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner()); + table = new DataSourceTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); } else if (HdfsFileFormat.isHdfsInputFormatClass(msTbl.getSd().getInputFormat())) { - table = new HdfsTable(id, msTbl, db, msTbl.getTableName(), msTbl.getOwner()); + table = new HdfsTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); } return table; } @@ -239,11 +235,10 @@ public abstract class Table implements CatalogObject { throws TableLoadingException { Table newTable; if (!thriftTable.isSetLoad_status() && thriftTable.isSetMetastore_table()) { - newTable = Table.fromMetastoreTable(new TableId(thriftTable.getId()), - parentDb, thriftTable.getMetastore_table()); + newTable = Table.fromMetastoreTable(parentDb, thriftTable.getMetastore_table()); } else { - newTable = IncompleteTable.createUninitializedTable( - TableId.createInvalidId(), parentDb, thriftTable.getTbl_name()); + newTable = + IncompleteTable.createUninitializedTable(parentDb, thriftTable.getTbl_name()); } newTable.loadFromThrift(thriftTable); newTable.validate(); @@ -291,7 +286,6 @@ public abstract class Table implements CatalogObject { public TTable toThrift() { TTable table = new TTable(db_.getName(), name_); - table.setId(id_.asInt()); table.setAccess_level(accessLevel_); // Populate both regular columns and clustering columns (if there are any). @@ -440,7 +434,6 @@ public abstract class Table implements CatalogObject { } public int getNumClusteringCols() { return numClusteringCols_; } - public TableId getId() { return id_; } public long getNumRows() { return numRows_; } public ArrayType getType() { return type_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/catalog/TableId.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/TableId.java b/fe/src/main/java/org/apache/impala/catalog/TableId.java deleted file mode 100644 index 2b1dabc..0000000 --- a/fe/src/main/java/org/apache/impala/catalog/TableId.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.impala.catalog; - -import org.apache.impala.common.Id; -import org.apache.impala.common.IdGenerator; - -public class TableId extends Id<TableId> { - // Construction only allowed via an IdGenerator. - protected TableId(int id) { - super(id); - } - - public static IdGenerator<TableId> createGenerator() { - return new IdGenerator<TableId>() { - @Override - public TableId getNextId() { return new TableId(nextId_++); } - @Override - public TableId getMaxId() { return new TableId(nextId_ - 1); } - }; - } - - /** - * Returns an invalid table id intended for temporary use, e.g., for CTAS. - */ - public static TableId createInvalidId() { return new TableId(INVALID_ID); } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/catalog/TableLoader.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java index 8541a3a..c3ae2ba 100644 --- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java @@ -72,7 +72,7 @@ public class TableLoader { } // Create a table of appropriate type and have it load itself - table = Table.fromMetastoreTable(catalog_.getNextTableId(), db, msTbl); + table = Table.fromMetastoreTable(db, msTbl); if (table == null) { throw new TableLoadingException( "Unrecognized table type for table: " + fullTblName); @@ -80,18 +80,17 @@ public class TableLoader { table.load(false, msClient.getHiveClient(), msTbl); table.validate(); } catch (TableLoadingException e) { - table = IncompleteTable.createFailedMetadataLoadTable( - TableId.createInvalidId(), db, tblName, e); + table = IncompleteTable.createFailedMetadataLoadTable(db, tblName, e); } catch (NoSuchObjectException e) { TableLoadingException tableDoesNotExist = new TableLoadingException( "Table " + fullTblName + " no longer exists in the Hive MetaStore. " + "Run 'invalidate metadata " + fullTblName + "' to update the Impala " + "catalog."); table = IncompleteTable.createFailedMetadataLoadTable( - TableId.createInvalidId(), db, tblName, tableDoesNotExist); + db, tblName, tableDoesNotExist); } catch (Exception e) { table = IncompleteTable.createFailedMetadataLoadTable( - catalog_.getNextTableId(), db, tblName, new TableLoadingException( + db, tblName, new TableLoadingException( "Failed to load metadata for table: " + fullTblName + ". Running " + "'invalidate metadata " + fullTblName + "' may resolve this problem.", e)); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java index 5f64b6d..84de5c4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java +++ b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java @@ -72,8 +72,8 @@ public class TableLoadingMgr { tbl = tblTask_.get(); } catch (Exception e) { tbl = IncompleteTable.createFailedMetadataLoadTable( - TableId.createInvalidId(), catalog_.getDb(tblName_.getDb_name()), - tblName_.getTable_name(), new TableLoadingException(e.getMessage(), e)); + catalog_.getDb(tblName_.getDb_name()), tblName_.getTable_name(), + new TableLoadingException(e.getMessage(), e)); } Preconditions.checkState(tbl.isLoaded()); return tbl; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/catalog/View.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/View.java b/fe/src/main/java/org/apache/impala/catalog/View.java index 46f089e..f437c58 100644 --- a/fe/src/main/java/org/apache/impala/catalog/View.java +++ b/fe/src/main/java/org/apache/impala/catalog/View.java @@ -71,9 +71,9 @@ public class View extends Table { // Set if this View is from a WITH clause with column labels. private List<String> colLabels_; - public View(TableId id, org.apache.hadoop.hive.metastore.api.Table msTable, + public View(org.apache.hadoop.hive.metastore.api.Table msTable, Db db, String name, String owner) { - super(id, msTable, db, name, owner); + super(msTable, db, name, owner); isLocalView_ = false; } @@ -82,17 +82,17 @@ public class View extends Table { * list of column labels. */ public View(String alias, QueryStmt queryStmt, List<String> colLabels) { - super(null, null, null, alias, null); + super(null, null, alias, null); isLocalView_ = true; queryStmt_ = queryStmt; colLabels_ = colLabels; } /** - * Creates a view for testig purposes. + * Creates a view for testing purposes. */ private View(Db db, String name, QueryStmt queryStmt) { - super(null, null, db, name, null); + super(null, db, name, null); isLocalView_ = false; queryStmt_ = queryStmt; colLabels_ = null; @@ -190,7 +190,7 @@ public class View extends Table { public boolean hasColLabels() { return colLabels_ != null; } @Override - public TTableDescriptor toThriftDescriptor(Set<Long> referencedPartitions) { + public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) { throw new IllegalStateException("Cannot call toThriftDescriptor() on a view."); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/common/Id.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/common/Id.java b/fe/src/main/java/org/apache/impala/common/Id.java index 3eee186..c7357aa 100644 --- a/fe/src/main/java/org/apache/impala/common/Id.java +++ b/fe/src/main/java/org/apache/impala/common/Id.java @@ -27,14 +27,12 @@ import com.google.common.collect.Lists; * Integer ids that cannot accidentally be compared with ints. */ public class Id<IdType extends Id<IdType>> implements Comparable<Id<IdType>> { - static protected int INVALID_ID = -1; protected final int id_; public Id(int id) { this.id_ = id; } - public boolean isValid() { return id_ != INVALID_ID; } public int asInt() { return id_; } @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java index aa77529..1d7994b 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java @@ -18,6 +18,7 @@ package org.apache.impala.planner; +import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.catalog.Table; import org.apache.impala.common.PrintUtils; import org.apache.impala.thrift.TDataSink; @@ -51,7 +52,7 @@ public class HBaseTableSink extends TableSink { @Override protected TDataSink toThrift() { TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK); - TTableSink tTableSink = new TTableSink(targetTable_.getId().asInt(), + TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID, TTableSinkType.HBASE, sinkOp_.toThrift()); result.table_sink = tTableSink; return result; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java index 3a217b9..bb06b5e 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java @@ -19,6 +19,7 @@ package org.apache.impala.planner; import java.util.List; +import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.analysis.Expr; import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.HdfsTable; @@ -148,7 +149,7 @@ public class HdfsTableSink extends TableSink { if (skipHeaderLineCount > 0) { hdfsTableSink.setSkip_header_line_count(skipHeaderLineCount); } - TTableSink tTableSink = new TTableSink(targetTable_.getId().asInt(), + TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID, TTableSinkType.HDFS, sinkOp_.toThrift()); tTableSink.hdfs_table_sink = hdfsTableSink; result.table_sink = tTableSink; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/planner/JoinTableId.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/JoinTableId.java b/fe/src/main/java/org/apache/impala/planner/JoinTableId.java index 5f96c90..92a2491 100644 --- a/fe/src/main/java/org/apache/impala/planner/JoinTableId.java +++ b/fe/src/main/java/org/apache/impala/planner/JoinTableId.java @@ -21,6 +21,8 @@ import org.apache.impala.common.Id; import org.apache.impala.common.IdGenerator; public class JoinTableId extends Id<JoinTableId> { + static int INVALID_JOIN_TABLE_ID = -1; + // Construction only allowed via an IdGenerator. protected JoinTableId(int id) { super(id); @@ -28,9 +30,11 @@ public class JoinTableId extends Id<JoinTableId> { public static JoinTableId INVALID; static { - INVALID = new JoinTableId(Id.INVALID_ID); + INVALID = new JoinTableId(INVALID_JOIN_TABLE_ID); } + public boolean isValid() { return id_ != INVALID_JOIN_TABLE_ID; } + public static IdGenerator<JoinTableId> createGenerator() { return new IdGenerator<JoinTableId>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java index 2fb9af7..3d98aca 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java @@ -21,6 +21,7 @@ package org.apache.impala.planner; import java.util.ArrayList; import java.util.List; +import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.catalog.Table; import org.apache.impala.common.PrintUtils; import org.apache.impala.thrift.TDataSink; @@ -76,7 +77,7 @@ public class KuduTableSink extends TableSink { @Override protected TDataSink toThrift() { TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK); - TTableSink tTableSink = new TTableSink(targetTable_.getId().asInt(), + TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID, TTableSinkType.KUDU, sinkOp_.toThrift()); TKuduTableSink tKuduSink = new TKuduTableSink(); tKuduSink.setReferenced_columns(targetColIdxs_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 6d535fd..4bb51ad 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -51,6 +51,7 @@ import org.apache.impala.analysis.CreateDataSrcStmt; import org.apache.impala.analysis.CreateDropRoleStmt; import org.apache.impala.analysis.CreateUdaStmt; import org.apache.impala.analysis.CreateUdfStmt; +import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.analysis.DropDataSrcStmt; import org.apache.impala.analysis.DropFunctionStmt; import org.apache.impala.analysis.DropStatsStmt; @@ -86,7 +87,6 @@ import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.ImpaladCatalog; import org.apache.impala.catalog.StructType; import org.apache.impala.catalog.Table; -import org.apache.impala.catalog.TableId; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FileSystemUtil; @@ -1208,7 +1208,7 @@ public class Frontend { TFinalizeParams finalizeParams = new TFinalizeParams(); finalizeParams.setIs_overwrite(insertStmt.isOverwrite()); finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl()); - finalizeParams.setTable_id(insertStmt.getTargetTable().getId().asInt()); + finalizeParams.setTable_id(DescriptorTable.TABLE_SINK_ID); String db = insertStmt.getTargetTableName().getDb(); finalizeParams.setTable_db(db == null ? queryCtx.session.database : db); HdfsTable hdfsTable = (HdfsTable) insertStmt.getTargetTable(); @@ -1223,41 +1223,12 @@ public class Frontend { result.query_exec_request.stmt_type = TStmtType.DML; } - validateTableIds(analysisResult.getAnalyzer(), result); - timeline.markEvent("Planning finished"); result.setTimeline(analysisResult.getAnalyzer().getTimeline().toThrift()); return result; } /** - * Check that we don't have any duplicate table IDs (see IMPALA-1702). - * To be removed when IMPALA-1702 is resolved. - */ - private void validateTableIds(Analyzer analyzer, TExecRequest result) - throws InternalException { - Map<TableId, Table> tableIds = Maps.newHashMap(); - Collection<TupleDescriptor> tupleDescs = analyzer.getDescTbl().getTupleDescs(); - for (TupleDescriptor desc: tupleDescs) { - // Skip if tuple descriptor did not come from materializing scan. - if (!desc.isMaterialized()) continue; - Table table = desc.getTable(); - if (table == null) continue; - Table otherTable = tableIds.get(table.getId()); - if (otherTable == table) continue; // Same table referenced twice - if (otherTable == null) { - tableIds.put(table.getId(), table); - continue; - } - LOG.error("Found duplicate table ID! id=" + table.getId() + "\ntable1=\n" - + table.toTCatalogObject() + "\ntable2=\n" + otherTable.toTCatalogObject() - + "\nexec_request=\n" + result); - throw new InternalException("Query encountered invalid metadata, likely due to " + - "IMPALA-1702. Please try rerunning the query."); - } - } - - /** * Attaches the explain result to the TExecRequest. */ private void createExplainRequest(String explainString, TExecRequest result) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java index 5a53f64..0a9ad66 100644 --- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java +++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java @@ -162,8 +162,8 @@ public class FrontendTestBase { Preconditions.checkState(createTableStmt.getPartitionColumnDefs().isEmpty()); Db db = catalog_.getDb(createTableStmt.getDb()); Preconditions.checkNotNull(db, "Test tables must be created in an existing db."); - HdfsTable dummyTable = new HdfsTable(null, null, db, createTableStmt.getTbl(), - createTableStmt.getOwner()); + HdfsTable dummyTable = new HdfsTable(null, db, + createTableStmt.getTbl(), createTableStmt.getOwner()); List<ColumnDef> columnDefs = createTableStmt.getColumnDefs(); for (int i = 0; i < columnDefs.size(); ++i) { ColumnDef colDef = columnDefs.get(i); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/01e7b110/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java index 9c12b89..a94901b 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +35,7 @@ import java.util.regex.Pattern; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.fs.Path; import org.apache.impala.analysis.ColumnLineageGraph; +import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.catalog.CatalogException; import org.apache.impala.common.FrontendTestBase; import org.apache.impala.common.ImpalaException; @@ -62,6 +64,7 @@ import org.apache.impala.thrift.TQueryExecRequest; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TScanRangeLocations; import org.apache.impala.thrift.TTableDescriptor; +import org.apache.impala.thrift.TTableSink; import org.apache.impala.thrift.TTupleDescriptor; import org.apache.impala.thrift.TUpdateMembershipRequest; import org.apache.impala.util.MembershipSnapshot; @@ -402,6 +405,7 @@ public class PlannerTestBase extends FrontendTestBase { // Test single node plan, scan range locations, and column lineage. TExecRequest singleNodeExecRequest = testPlan(testCase, Section.PLAN, queryCtx, errorLog, actualOutput); + validateTableIds(singleNodeExecRequest); checkScanRangeLocations(testCase, singleNodeExecRequest, errorLog, actualOutput); checkColumnLineage(testCase, singleNodeExecRequest, errorLog, actualOutput); checkLimitCardinality(query, singleNodeExecRequest, errorLog); @@ -412,6 +416,51 @@ public class PlannerTestBase extends FrontendTestBase { } /** + * Validate that all tables in the descriptor table of 'request' have a unique id and + * those are properly referenced by tuple descriptors and table sink. + */ + private void validateTableIds(TExecRequest request) { + if (request == null || !request.isSetQuery_exec_request()) return; + TQueryExecRequest execRequest = request.query_exec_request; + HashSet<Integer> seenTableIds = Sets.newHashSet(); + if (execRequest.isSetDesc_tbl()) { + TDescriptorTable descTbl = execRequest.desc_tbl; + if (descTbl.isSetTableDescriptors()) { + for (TTableDescriptor tableDesc: descTbl.tableDescriptors) { + if (seenTableIds.contains(tableDesc.id)) { + throw new IllegalStateException("Failed to verify table id for table: " + + tableDesc.getDbName() + "." + tableDesc.getTableName() + + ".\nTable id: " + tableDesc.id + " already used."); + } + seenTableIds.add(tableDesc.id); + } + } + + if (descTbl.isSetTupleDescriptors()) { + for (TTupleDescriptor tupleDesc: descTbl.tupleDescriptors) { + if (tupleDesc.isSetTableId() && !seenTableIds.contains(tupleDesc.tableId)) { + throw new IllegalStateException("TableDescriptor does not include table id" + + "of:\n" + tupleDesc.toString()); + } + } + } + } + + if (execRequest.isSetFragments() && !execRequest.fragments.isEmpty()) { + TPlanFragment firstPlanFragment = execRequest.fragments.get(0); + if (firstPlanFragment.isSetOutput_sink() + && firstPlanFragment.output_sink.isSetTable_sink()) { + TTableSink tableSink = firstPlanFragment.output_sink.table_sink; + if (!seenTableIds.contains(tableSink.target_table_id) + || tableSink.target_table_id != DescriptorTable.TABLE_SINK_ID) { + throw new IllegalStateException("Table sink id error for target table:\n" + + tableSink.toString()); + } + } + } + } + + /** * Produces the single-node or distributed plan for testCase and compares the * actual/expected plans if the corresponding test section exists in testCase. *
