http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java index 3acb1a3..73173cb 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java @@ -17,23 +17,12 @@ package org.apache.impala.analysis; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; -import org.apache.avro.Schema; -import org.apache.avro.SchemaParseException; -import org.apache.hadoop.fs.permission.FsAction; - -import org.apache.impala.authorization.Privilege; -import org.apache.impala.catalog.HdfsStorageDescriptor; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.RowFormat; import org.apache.impala.common.AnalysisException; -import org.apache.impala.common.FileSystemUtil; -import org.apache.impala.thrift.TAccessEvent; -import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TCreateTableParams; import org.apache.impala.thrift.THdfsFileFormat; import org.apache.impala.thrift.TTableName; @@ -41,113 +30,80 @@ import org.apache.impala.util.AvroSchemaConverter; import org.apache.impala.util.AvroSchemaParser; import org.apache.impala.util.AvroSchemaUtils; import org.apache.impala.util.KuduUtil; -import org.apache.impala.util.MetaStoreUtil; + +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaParseException; /** * Represents a CREATE TABLE statement. */ public class CreateTableStmt extends StatementBase { - private List<ColumnDef> columnDefs_; - private final String comment_; - private final boolean isExternal_; - private final boolean ifNotExists_; - private final THdfsFileFormat fileFormat_; - private final ArrayList<ColumnDef> partitionColDefs_; - private final RowFormat rowFormat_; - private TableName tableName_; - private final Map<String, String> tblProperties_; - private final Map<String, String> serdeProperties_; - private final HdfsCachingOp cachingOp_; - private HdfsUri location_; - private final List<DistributeParam> distributeParams_; - - // Set during analysis + + @VisibleForTesting + final static String KUDU_STORAGE_HANDLER_ERROR_MESSAGE = "Kudu tables must be" + + " specified using 'STORED AS KUDU' without using the storage handler table" + + " property."; + + // Table parameters specified in a CREATE TABLE statement + private final TableDef tableDef_; + + // Table owner. Set during analysis private String owner_; - /** - * Builds a CREATE TABLE statement - * @param tableName - Name of the new table - * @param columnDefs - List of column definitions for the table - * @param partitionColumnDefs - List of partition column definitions for the table - * @param isExternal - If true, the table's data will be preserved if dropped. - * @param comment - Comment to attach to the table - * @param rowFormat - Custom row format of the table. Use RowFormat.DEFAULT_ROW_FORMAT - * to specify default row format. - * @param fileFormat - File format of the table - * @param location - The HDFS location of where the table data will stored. - * @param cachingOp - The HDFS caching op that should be applied to this table. - * @param ifNotExists - If true, no errors are thrown if the table already exists. - * @param tblProperties - Optional map of key/values to persist with table metadata. - * @param serdeProperties - Optional map of key/values to persist with table serde - * metadata. - */ - public CreateTableStmt(TableName tableName, List<ColumnDef> columnDefs, - List<ColumnDef> partitionColumnDefs, boolean isExternal, String comment, - RowFormat rowFormat, THdfsFileFormat fileFormat, HdfsUri location, - HdfsCachingOp cachingOp, boolean ifNotExists, Map<String, String> tblProperties, - Map<String, String> serdeProperties, List<DistributeParam> distributeParams) { - Preconditions.checkNotNull(columnDefs); - Preconditions.checkNotNull(partitionColumnDefs); - Preconditions.checkNotNull(fileFormat); - Preconditions.checkNotNull(rowFormat); - Preconditions.checkNotNull(tableName); - - columnDefs_ = Lists.newArrayList(columnDefs); - comment_ = comment; - isExternal_ = isExternal; - ifNotExists_ = ifNotExists; - fileFormat_ = fileFormat; - location_ = location; - cachingOp_ = cachingOp; - partitionColDefs_ = Lists.newArrayList(partitionColumnDefs); - rowFormat_ = rowFormat; - tableName_ = tableName; - tblProperties_ = tblProperties; - serdeProperties_ = serdeProperties; - unescapeProperties(tblProperties_); - unescapeProperties(serdeProperties_); - distributeParams_ = distributeParams; + public CreateTableStmt(TableDef tableDef) { + Preconditions.checkNotNull(tableDef); + tableDef_ = tableDef; } /** * Copy c'tor. */ - public CreateTableStmt(CreateTableStmt other) { - columnDefs_ = Lists.newArrayList(other.columnDefs_); - comment_ = other.comment_; - isExternal_ = other.isExternal_; - ifNotExists_ = other.ifNotExists_; - fileFormat_ = other.fileFormat_; - location_ = other.location_; - cachingOp_ = other.cachingOp_; - partitionColDefs_ = Lists.newArrayList(other.partitionColDefs_); - rowFormat_ = other.rowFormat_; - tableName_ = other.tableName_; - tblProperties_ = other.tblProperties_; - serdeProperties_ = other.serdeProperties_; - distributeParams_ = other.distributeParams_; + CreateTableStmt(CreateTableStmt other) { + this(other.tableDef_); + owner_ = other.owner_; } @Override public CreateTableStmt clone() { return new CreateTableStmt(this); } - public String getTbl() { return tableName_.getTbl(); } - public TableName getTblName() { return tableName_; } - public List<ColumnDef> getColumnDefs() { return columnDefs_; } - public List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; } - public String getComment() { return comment_; } - public boolean isExternal() { return isExternal_; } - public boolean getIfNotExists() { return ifNotExists_; } - public HdfsUri getLocation() { return location_; } - public void setLocation(HdfsUri location) { this.location_ = location; } - public THdfsFileFormat getFileFormat() { return fileFormat_; } - public RowFormat getRowFormat() { return rowFormat_; } - public Map<String, String> getTblProperties() { return tblProperties_; } - public Map<String, String> getSerdeProperties() { return serdeProperties_; } + public String getTbl() { return getTblName().getTbl(); } + public TableName getTblName() { return tableDef_.getTblName(); } + public boolean getIfNotExists() { return tableDef_.getIfNotExists(); } + public List<ColumnDef> getColumnDefs() { return tableDef_.getColumnDefs(); } + private void setColumnDefs(List<ColumnDef> colDefs) { + getColumnDefs().clear(); + getColumnDefs().addAll(colDefs); + } + private List<ColumnDef> getPrimaryKeyColumnDefs() { + return tableDef_.getPrimaryKeyColumnDefs(); + } + public boolean isExternal() { return tableDef_.isExternal(); } + public List<ColumnDef> getPartitionColumnDefs() { + return tableDef_.getPartitionColumnDefs(); + } + public List<DistributeParam> getDistributeParams() { + return tableDef_.getDistributeParams(); + } + public String getComment() { return tableDef_.getComment(); } + Map<String, String> getTblProperties() { return tableDef_.getTblProperties(); } + private HdfsCachingOp getCachingOp() { return tableDef_.getCachingOp(); } + public HdfsUri getLocation() { return tableDef_.getLocation(); } + Map<String, String> getSerdeProperties() { return tableDef_.getSerdeProperties(); } + public THdfsFileFormat getFileFormat() { return tableDef_.getFileFormat(); } + RowFormat getRowFormat() { return tableDef_.getRowFormat(); } + + // Only exposed for ToSqlUtils. Returns the list of primary keys declared by the user + // at the table level. Note that primary keys may also be declared in column + // definitions, those are not included here (they are stored in the ColumnDefs). + List<String> getTblPrimaryKeyColumnNames() { + return tableDef_.getPrimaryKeyColumnNames(); + } /** * Can only be called after analysis, returns the owner of this table (the user from @@ -164,7 +120,7 @@ public class CreateTableStmt extends StatementBase { */ public String getDb() { Preconditions.checkState(isAnalyzed()); - return tableName_.getDb(); + return getTblName().getDb(); } @Override @@ -173,240 +129,246 @@ public class CreateTableStmt extends StatementBase { public TCreateTableParams toThrift() { TCreateTableParams params = new TCreateTableParams(); params.setTable_name(new TTableName(getDb(), getTbl())); - for (ColumnDef col: getColumnDefs()) { - params.addToColumns(col.toThrift()); - } + List<org.apache.impala.thrift.TColumn> tColumns = Lists.newArrayList(); + for (ColumnDef col: getColumnDefs()) tColumns.add(col.toThrift()); + params.setColumns(tColumns); for (ColumnDef col: getPartitionColumnDefs()) { params.addToPartition_columns(col.toThrift()); } params.setOwner(getOwner()); params.setIs_external(isExternal()); - params.setComment(comment_); - params.setLocation(location_ == null ? null : location_.toString()); - if (cachingOp_ != null) params.setCache_op(cachingOp_.toThrift()); - params.setRow_format(rowFormat_.toThrift()); - params.setFile_format(fileFormat_); + params.setComment(getComment()); + params.setLocation(getLocation() == null ? null : getLocation().toString()); + if (getCachingOp() != null) params.setCache_op(getCachingOp().toThrift()); + if (getRowFormat() != null) params.setRow_format(getRowFormat().toThrift()); + params.setFile_format(getFileFormat()); params.setIf_not_exists(getIfNotExists()); - if (tblProperties_ != null) params.setTable_properties(tblProperties_); - if (serdeProperties_ != null) params.setSerde_properties(serdeProperties_); - if (distributeParams_ != null) { - for (DistributeParam d : distributeParams_) { - params.addToDistribute_by(d.toThrift()); - } + params.setTable_properties(getTblProperties()); + params.setSerde_properties(getSerdeProperties()); + for (DistributeParam d: getDistributeParams()) { + params.addToDistribute_by(d.toThrift()); + } + for (ColumnDef pkColDef: getPrimaryKeyColumnDefs()) { + params.addToPrimary_key_column_names(pkColDef.getColName()); } + return params; } @Override public void analyze(Analyzer analyzer) throws AnalysisException { super.analyze(analyzer); - Preconditions.checkState(tableName_ != null && !tableName_.isEmpty()); - tableName_ = analyzer.getFqTableName(tableName_); - tableName_.analyze(); owner_ = analyzer.getUser().getName(); - - MetaStoreUtil.checkShortPropertyMap("Property", tblProperties_); - MetaStoreUtil.checkShortPropertyMap("Serde property", serdeProperties_); - - if (analyzer.dbContainsTable(tableName_.getDb(), tableName_.getTbl(), - Privilege.CREATE) && !ifNotExists_) { - throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + tableName_); - } - - analyzer.addAccessEvent(new TAccessEvent(tableName_.toString(), - TCatalogObjectType.TABLE, Privilege.CREATE.toString())); - - // Only Avro tables can have empty column defs because they can infer them from - // the Avro schema. - if (columnDefs_.isEmpty() && fileFormat_ != THdfsFileFormat.AVRO) { + tableDef_.analyze(analyzer); + analyzeKuduFormat(analyzer); + // Avro tables can have empty column defs because they can infer them from the Avro + // schema. Likewise for external Kudu tables, the schema can be read from Kudu. + if (getColumnDefs().isEmpty() && getFileFormat() != THdfsFileFormat.AVRO + && getFileFormat() != THdfsFileFormat.KUDU) { throw new AnalysisException("Table requires at least 1 column"); } - - if (location_ != null) { - location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE); - } - - analyzeRowFormat(analyzer); - - // Check that all the column names are valid and unique. - analyzeColumnDefs(analyzer); - - if (getTblProperties() != null && KuduTable.KUDU_STORAGE_HANDLER.equals( - getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) { - analyzeKuduTable(analyzer); - } else if (distributeParams_ != null) { - throw new AnalysisException("Only Kudu tables can use DISTRIBUTE BY clause."); - } - - if (fileFormat_ == THdfsFileFormat.AVRO) { - columnDefs_ = analyzeAvroSchema(analyzer); - if (columnDefs_.isEmpty()) { + if (getFileFormat() == THdfsFileFormat.AVRO) { + setColumnDefs(analyzeAvroSchema(analyzer)); + if (getColumnDefs().isEmpty()) { throw new AnalysisException( "An Avro table requires column definitions or an Avro schema."); } - AvroSchemaUtils.setFromSerdeComment(columnDefs_); - analyzeColumnDefs(analyzer); + AvroSchemaUtils.setFromSerdeComment(getColumnDefs()); } + } - if (cachingOp_ != null) { - cachingOp_.analyze(analyzer); - if (cachingOp_.shouldCache() && location_ != null && - !FileSystemUtil.isPathCacheable(location_.getPath())) { - throw new AnalysisException(String.format("Location '%s' cannot be cached. " + - "Please retry without caching: CREATE TABLE %s ... UNCACHED", - location_.toString(), tableName_)); + /** + * Analyzes the parameters of a CREATE TABLE ... STORED AS KUDU statement. Also checks + * if Kudu specific properties and parameters are specified for non-Kudu tables. + */ + private void analyzeKuduFormat(Analyzer analyzer) throws AnalysisException { + if (getFileFormat() != THdfsFileFormat.KUDU) { + if (KuduTable.KUDU_STORAGE_HANDLER.equals( + getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) { + throw new AnalysisException(KUDU_STORAGE_HANDLER_ERROR_MESSAGE); + } + AnalysisUtils.throwIfNotEmpty(getDistributeParams(), + "Only Kudu tables can use the DISTRIBUTE BY clause."); + if (hasPrimaryKey()) { + throw new AnalysisException("Only Kudu tables can specify a PRIMARY KEY."); } + return; } - // Analyze 'skip.header.line.format' property. - if (tblProperties_ != null) { - AlterTableSetTblProperties.analyzeSkipHeaderLineCount(tblProperties_); + analyzeKuduTableProperties(analyzer); + if (isExternal()) { + analyzeExternalKuduTableParams(); + } else { + analyzeManagedKuduTableParams(analyzer); } } - private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException { - Byte fieldDelim = analyzeRowFormatValue(rowFormat_.getFieldDelimiter()); - Byte lineDelim = analyzeRowFormatValue(rowFormat_.getLineDelimiter()); - Byte escapeChar = analyzeRowFormatValue(rowFormat_.getEscapeChar()); - if (fileFormat_ == THdfsFileFormat.TEXT) { - if (fieldDelim == null) fieldDelim = HdfsStorageDescriptor.DEFAULT_FIELD_DELIM; - if (lineDelim == null) lineDelim = HdfsStorageDescriptor.DEFAULT_LINE_DELIM; - if (escapeChar == null) escapeChar = HdfsStorageDescriptor.DEFAULT_ESCAPE_CHAR; - if (fieldDelim != null && lineDelim != null && fieldDelim.equals(lineDelim)) { - throw new AnalysisException("Field delimiter and line delimiter have same " + - "value: byte " + fieldDelim); - } - if (fieldDelim != null && escapeChar != null && fieldDelim.equals(escapeChar)) { - analyzer.addWarning("Field delimiter and escape character have same value: " + - "byte " + fieldDelim + ". Escape character will be ignored"); - } - if (lineDelim != null && escapeChar != null && lineDelim.equals(escapeChar)) { - analyzer.addWarning("Line delimiter and escape character have same value: " + - "byte " + lineDelim + ". Escape character will be ignored"); + /** + * Analyzes and checks table properties which are common to both managed and external + * Kudu tables. + */ + private void analyzeKuduTableProperties(Analyzer analyzer) throws AnalysisException { + if (getTblProperties().containsKey(KuduTable.KEY_STORAGE_HANDLER)) { + throw new AnalysisException(KUDU_STORAGE_HANDLER_ERROR_MESSAGE); + } + getTblProperties().put(KuduTable.KEY_STORAGE_HANDLER, KuduTable.KUDU_STORAGE_HANDLER); + + String masterHosts = getTblProperties().get(KuduTable.KEY_MASTER_HOSTS); + if (Strings.isNullOrEmpty(masterHosts)) { + masterHosts = analyzer.getCatalog().getDefaultKuduMasterHosts(); + if (masterHosts.isEmpty()) { + throw new AnalysisException(String.format( + "Table property '%s' is required when the impalad startup flag " + + "-kudu_master_hosts is not used.", KuduTable.KEY_MASTER_HOSTS)); } + getTblProperties().put(KuduTable.KEY_MASTER_HOSTS, masterHosts); } + + // TODO: Find out what is creating a directory in HDFS and stop doing that. Kudu + // tables shouldn't have HDFS dirs. + // https://issues.cloudera.org/browse/IMPALA-3570 + AnalysisUtils.throwIfNotNull(getCachingOp(), + "A Kudu table cannot be cached in HDFS."); + AnalysisUtils.throwIfNotNull(getLocation(), "LOCATION cannot be specified for a " + + "Kudu table."); + AnalysisUtils.throwIfNotEmpty(tableDef_.getPartitionColumnDefs(), + "PARTITIONED BY cannot be used in Kudu tables."); } /** - * Analyzes columnDefs_ and partitionColDefs_ checking whether all column - * names are unique. + * Analyzes and checks parameters specified for external Kudu tables. */ - private void analyzeColumnDefs(Analyzer analyzer) throws AnalysisException { - Set<String> colNames = Sets.newHashSet(); - for (ColumnDef colDef: columnDefs_) { - colDef.analyze(); - if (!colNames.add(colDef.getColName().toLowerCase())) { - throw new AnalysisException("Duplicate column name: " + colDef.getColName()); - } + private void analyzeExternalKuduTableParams() throws AnalysisException { + AnalysisUtils.throwIfNull(getTblProperties().get(KuduTable.KEY_TABLE_NAME), + String.format("Table property %s must be specified when creating " + + "an external Kudu table.", KuduTable.KEY_TABLE_NAME)); + if (hasPrimaryKey() + || getTblProperties().containsKey(KuduTable.KEY_KEY_COLUMNS)) { + throw new AnalysisException("Primary keys cannot be specified for an external " + + "Kudu table"); } - for (ColumnDef colDef: partitionColDefs_) { - colDef.analyze(); - if (!colDef.getType().supportsTablePartitioning()) { - throw new AnalysisException( - String.format("Type '%s' is not supported as partition-column type " + - "in column: %s", colDef.getType().toSql(), colDef.getColName())); + AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_TABLET_REPLICAS), + String.format("Table property '%s' cannot be used with an external Kudu table.", + KuduTable.KEY_TABLET_REPLICAS)); + AnalysisUtils.throwIfNotEmpty(getColumnDefs(), + "Columns cannot be specified with an external Kudu table."); + AnalysisUtils.throwIfNotEmpty(getDistributeParams(), + "DISTRIBUTE BY cannot be used with an external Kudu table."); + } + + /** + * Analyzes and checks parameters specified for managed Kudu tables. + */ + private void analyzeManagedKuduTableParams(Analyzer analyzer) throws AnalysisException { + // If no Kudu table name is specified in tblproperties, generate one using the + // current database as a prefix to avoid conflicts in Kudu. + if (!getTblProperties().containsKey(KuduTable.KEY_TABLE_NAME)) { + getTblProperties().put(KuduTable.KEY_TABLE_NAME, + KuduUtil.getDefaultCreateKuduTableName(getDb(), getTbl())); + } + AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_KEY_COLUMNS), + String.format("PRIMARY KEY must be used instead of the table property '%s'.", + KuduTable.KEY_KEY_COLUMNS)); + if (!hasPrimaryKey()) { + throw new AnalysisException("A primary key is required for a Kudu table."); + } + String tabletReplicas = getTblProperties().get(KuduTable.KEY_TABLET_REPLICAS); + if (tabletReplicas != null) { + Integer r = Ints.tryParse(tabletReplicas); + if (r == null) { + throw new AnalysisException(String.format( + "Table property '%s' must be an integer.", KuduTable.KEY_TABLET_REPLICAS)); + } + if (r <= 0) { + throw new AnalysisException("Number of tablet replicas must be greater than " + + "zero. Given number of replicas is: " + r.toString()); } - if (!colNames.add(colDef.getColName().toLowerCase())) { - throw new AnalysisException("Duplicate column name: " + colDef.getColName()); + } + + if (!getDistributeParams().isEmpty()) { + analyzeDistributeParams(analyzer); + } else { + throw new AnalysisException("Table distribution must be specified for " + + "managed Kudu tables."); + } + } + + /** + * Analyzes the distribution schemes specified in the CREATE TABLE statement. + */ + private void analyzeDistributeParams(Analyzer analyzer) throws AnalysisException { + Preconditions.checkState(getFileFormat() == THdfsFileFormat.KUDU); + Map<String, ColumnDef> pkColDefsByName = + ColumnDef.mapByColumnNames(getPrimaryKeyColumnDefs()); + for (DistributeParam distributeParam: getDistributeParams()) { + // If no column names were specified in this distribution scheme, use all the + // primary key columns. + if (!distributeParam.hasColumnNames()) { + distributeParam.setColumnNames(pkColDefsByName.keySet()); } + distributeParam.setPkColumnDefMap(pkColDefsByName); + distributeParam.analyze(analyzer); } } /** - * Analyzes the Avro schema and compares it with the columnDefs_ to detect + * Checks if a primary key is specified in a CREATE TABLE stmt. Should only be called + * after tableDef_ has been analyzed. + */ + private boolean hasPrimaryKey() { + Preconditions.checkState(tableDef_.isAnalyzed()); + return !tableDef_.getPrimaryKeyColumnDefs().isEmpty(); + } + + /** + * Analyzes the Avro schema and compares it with the getColumnDefs() to detect * inconsistencies. Returns a list of column descriptors that should be - * used for creating the table (possibly identical to columnDefs_). + * used for creating the table (possibly identical to getColumnDefs()). */ - private List<ColumnDef> analyzeAvroSchema(Analyzer analyzer) - throws AnalysisException { - Preconditions.checkState(fileFormat_ == THdfsFileFormat.AVRO); + private List<ColumnDef> analyzeAvroSchema(Analyzer analyzer) throws AnalysisException { + Preconditions.checkState(getFileFormat() == THdfsFileFormat.AVRO); // Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with latter // taking precedence. List<Map<String, String>> schemaSearchLocations = Lists.newArrayList(); - schemaSearchLocations.add(serdeProperties_); - schemaSearchLocations.add(tblProperties_); - String avroSchema = null; - List<ColumnDef> avroCols = null; // parsed from avroSchema + schemaSearchLocations.add(getSerdeProperties()); + schemaSearchLocations.add(getTblProperties()); + String avroSchema; + List<ColumnDef> avroCols; // parsed from avroSchema try { avroSchema = AvroSchemaUtils.getAvroSchema(schemaSearchLocations); if (avroSchema == null) { // No Avro schema was explicitly set in the serde or table properties, so infer // the Avro schema from the column definitions. Schema inferredSchema = AvroSchemaConverter.convertColumnDefs( - columnDefs_, tableName_.toString()); + getColumnDefs(), getTblName().toString()); avroSchema = inferredSchema.toString(); } if (Strings.isNullOrEmpty(avroSchema)) { throw new AnalysisException("Avro schema is null or empty: " + - tableName_.toString()); + getTblName().toString()); } avroCols = AvroSchemaParser.parse(avroSchema); } catch (SchemaParseException e) { throw new AnalysisException(String.format( - "Error parsing Avro schema for table '%s': %s", tableName_.toString(), + "Error parsing Avro schema for table '%s': %s", getTblName().toString(), e.getMessage())); } Preconditions.checkNotNull(avroCols); - // Analyze the Avro schema to detect inconsistencies with the columnDefs_. + // Analyze the Avro schema to detect inconsistencies with the getColumnDefs(). // In case of inconsistencies, the column defs are ignored in favor of the Avro // schema for simplicity and, in particular, to enable COMPUTE STATS (IMPALA-1104). StringBuilder warning = new StringBuilder(); List<ColumnDef> reconciledColDefs = - AvroSchemaUtils.reconcileSchemas(columnDefs_, avroCols, warning); + AvroSchemaUtils.reconcileSchemas(getColumnDefs(), avroCols, warning); if (warning.length() > 0) analyzer.addWarning(warning.toString()); return reconciledColDefs; } - private void analyzeKuduTable(Analyzer analyzer) throws AnalysisException { - // Validate that Kudu table is correctly specified. - if (!KuduTable.tableParamsAreValid(getTblProperties())) { - throw new AnalysisException("Kudu table is missing parameters " + - String.format("in table properties. Please verify if %s, %s, and %s are " - + "present and have valid values.", - KuduTable.KEY_TABLE_NAME, KuduTable.KEY_MASTER_ADDRESSES, - KuduTable.KEY_KEY_COLUMNS)); - } - - // Kudu table cannot be a cached table - if (cachingOp_ != null) { - throw new AnalysisException("A Kudu table cannot be cached in HDFS."); - } - - if (distributeParams_ != null) { - if (isExternal_) { - throw new AnalysisException( - "The DISTRIBUTE BY clause may not be specified for external tables."); - } - - List<String> keyColumns = KuduUtil.parseKeyColumnsAsList( - getTblProperties().get(KuduTable.KEY_KEY_COLUMNS)); - for (DistributeParam d : distributeParams_) { - // If the columns are not set, default to all key columns - if (d.getColumns() == null) d.setColumns(keyColumns); - d.analyze(analyzer); - } - } else if (!isExternal_) { - throw new AnalysisException( - "A data distribution must be specified using the DISTRIBUTE BY clause."); - } - } - - private Byte analyzeRowFormatValue(String value) throws AnalysisException { - if (value == null) return null; - Byte byteVal = HdfsStorageDescriptor.parseDelim(value); - if (byteVal == null) { - throw new AnalysisException("ESCAPED BY values and LINE/FIELD " + - "terminators must be specified as a single character or as a decimal " + - "value in the range [-128:127]: " + value); - } - return byteVal; - } - /** * Unescapes all values in the property map. */ - public static void unescapeProperties(Map<String, String> propertyMap) { + static void unescapeProperties(Map<String, String> propertyMap) { if (propertyMap == null) return; for (Map.Entry<String, String> kv : propertyMap.entrySet()) { propertyMap.put(kv.getKey(),
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java b/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java index 319fe50..34bed86 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java +++ b/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java @@ -17,19 +17,20 @@ package org.apache.impala.analysis; -import java.math.BigDecimal; -import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TDistributeByHashParam; import org.apache.impala.thrift.TDistributeByRangeParam; import org.apache.impala.thrift.TDistributeParam; -import org.apache.impala.thrift.TDistributeType; import org.apache.impala.thrift.TRangeLiteral; import org.apache.impala.thrift.TRangeLiteralList; +import org.apache.impala.util.KuduUtil; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Lists; /** @@ -50,16 +51,16 @@ public class DistributeParam implements ParseNode { /** * Creates a DistributeParam partitioned by hash. */ - public static DistributeParam createHashParam(List<String> cols, BigDecimal buckets) { - return new DistributeParam(Type.HASH, cols, buckets); + public static DistributeParam createHashParam(List<String> cols, int buckets) { + return new DistributeParam(Type.HASH, cols, buckets, null); } /** * Creates a DistributeParam partitioned by range. */ public static DistributeParam createRangeParam(List<String> cols, - ArrayList<ArrayList<LiteralExpr>> splitRows) { - return new DistributeParam(Type.RANGE, cols, splitRows); + List<List<LiteralExpr>> splitRows) { + return new DistributeParam(Type.RANGE, cols, NO_BUCKETS, splitRows); } private static final int NO_BUCKETS = -1; @@ -69,131 +70,159 @@ public class DistributeParam implements ParseNode { */ public enum Type { HASH, RANGE - }; + } + + // May be empty indicating that all keys in the table should be used. + private final List<String> colNames_ = Lists.newArrayList(); - private List<String> columns_; + // Map of primary key column names to the associated column definitions. Must be set + // before the call to analyze(). + private Map<String, ColumnDef> pkColumnDefByName_; + // Distribution type private final Type type_; // Only relevant for hash partitioning, -1 otherwise - private final int num_buckets_; + private final int numBuckets_; // Only relevant for range partitioning, null otherwise - private final ArrayList<ArrayList<LiteralExpr>> splitRows_; - - // Set in analyze() - private TDistributeByRangeParam rangeParam_; - - private DistributeParam(Type t, List<String> cols, BigDecimal buckets) { - type_ = t; - columns_ = cols; - num_buckets_ = buckets.intValue(); - splitRows_ = null; - } + private final List<List<LiteralExpr>> splitRows_; - private DistributeParam(Type t, List<String> cols, - ArrayList<ArrayList<LiteralExpr>> splitRows) { + private DistributeParam(Type t, List<String> colNames, int buckets, + List<List<LiteralExpr>> splitRows) { type_ = t; - columns_ = cols; + for (String name: colNames) colNames_.add(name.toLowerCase()); + numBuckets_ = buckets; splitRows_ = splitRows; - num_buckets_ = NO_BUCKETS; } - /** - * TODO Refactor the logic below to analyze 'columns_'. This analysis should output - * a vector of column types that would then be used during the analysis of the split - * rows. - */ @Override public void analyze(Analyzer analyzer) throws AnalysisException { - if (type_ == Type.HASH && num_buckets_ <= 1) { - throw new AnalysisException(String.format( - "Number of buckets in DISTRIBUTE BY clause '%s' must be larger than 1.", - toSql())); - } else if (type_ == Type.RANGE) { - // Creating the thrift structure simultaneously checks for semantic errors - rangeParam_ = new TDistributeByRangeParam(); - rangeParam_.setColumns(columns_); - - for (ArrayList<LiteralExpr> splitRow : splitRows_) { - TRangeLiteralList list = new TRangeLiteralList(); - if (splitRow.size() != columns_.size()) { + Preconditions.checkState(!colNames_.isEmpty()); + Preconditions.checkNotNull(pkColumnDefByName_); + Preconditions.checkState(!pkColumnDefByName_.isEmpty()); + // Validate the columns specified in the DISTRIBUTE BY clause + for (String colName: colNames_) { + if (!pkColumnDefByName_.containsKey(colName)) { + throw new AnalysisException(String.format("Column '%s' in '%s' is not a key " + + "column. Only key columns can be used in DISTRIBUTE BY.", colName, toSql())); + } + } + + if (type_ == Type.RANGE) { + for (List<LiteralExpr> splitRow : splitRows_) { + if (splitRow.size() != colNames_.size()) { throw new AnalysisException(String.format( "SPLIT ROWS has different size than number of projected key columns: %d. " - + "Split row: %s", columns_.size(), splitRowToString(splitRow))); + + "Split row: %s", colNames_.size(), splitRowToString(splitRow))); } - for (LiteralExpr expr : splitRow) { + for (int i = 0; i < splitRow.size(); ++i) { + LiteralExpr expr = splitRow.get(i); + ColumnDef colDef = pkColumnDefByName_.get(colNames_.get(i)); + org.apache.impala.catalog.Type colType = colDef.getType(); + Preconditions.checkState(KuduUtil.isSupportedKeyType(colType)); expr.analyze(analyzer); - TRangeLiteral literal = new TRangeLiteral(); - if (expr instanceof NumericLiteral) { - NumericLiteral num = (NumericLiteral) expr; - if (num.getType().isDecimal() || num.getType().isFloatingPointType()) { - throw new AnalysisException("Only integral and string values allowed for" + - " split rows."); - } else { - literal.setInt_literal(num.getIntValue()); - } - } else if (expr instanceof StringLiteral) { - StringLiteral string = (StringLiteral) expr; - literal.setString_literal(string.getStringValue()); - } else if (expr instanceof BoolLiteral) { - BoolLiteral bool = (BoolLiteral) expr; - literal.setBool_literal(bool.getValue()); - } else { - throw new AnalysisException(String.format("Split row value is not supported: " - + "%s (Type: %s).", expr.getStringValue(), expr.getType().toSql())); + org.apache.impala.catalog.Type exprType = expr.getType(); + if (exprType.isNull()) { + throw new AnalysisException("Split values cannot be NULL. Split row: " + + splitRowToString(splitRow)); + } + if (!org.apache.impala.catalog.Type.isImplicitlyCastable(exprType, colType, + true)) { + throw new AnalysisException(String.format("Split value %s (type: %s) is " + + "not type compatible with column '%s' (type: %s).", expr.toSql(), + exprType, colDef.getColName(), colType.toSql())); } - list.addToValues(literal); } - rangeParam_.addToSplit_rows(list); } } } @Override public String toSql() { - if (num_buckets_ == NO_BUCKETS) { - List<String> splitRowStrings = Lists.newArrayList(); - for (ArrayList<LiteralExpr> splitRow : splitRows_) { - splitRowStrings.add(splitRowToString(splitRow)); - } - return String.format("RANGE(%s) INTO RANGES(%s)", Joiner.on(", ").join(columns_), - Joiner.on(", ").join(splitRowStrings)); + StringBuilder builder = new StringBuilder(type_.toString()); + if (!colNames_.isEmpty()) { + builder.append(" ("); + Joiner.on(", ").appendTo(builder, colNames_).append(")"); + } + if (type_ == Type.HASH) { + builder.append(" INTO "); + Preconditions.checkState(numBuckets_ != NO_BUCKETS); + builder.append(numBuckets_).append(" BUCKETS"); } else { - return String.format("HASH(%s) INTO %d BUCKETS", Joiner.on(", ").join(columns_), - num_buckets_); + builder.append(" SPLIT ROWS ("); + if (splitRows_ == null) { + builder.append("..."); + } else { + for (List<LiteralExpr> splitRow: splitRows_) { + builder.append(splitRowToString(splitRow)); + } + } + builder.append(")"); } + return builder.toString(); } - private String splitRowToString(ArrayList<LiteralExpr> splitRow) { - StringBuilder builder = new StringBuilder(); - builder.append("("); - List<String> rangeElementStrings = Lists.newArrayList(); - for (LiteralExpr rangeElement : splitRow) { - rangeElementStrings.add(rangeElement.toSql()); + @Override + public String toString() { return toSql(); } + + private String splitRowToString(List<LiteralExpr> splitRow) { + StringBuilder builder = new StringBuilder("("); + for (LiteralExpr expr: splitRow) { + if (builder.length() > 1) builder.append(", "); + builder.append(expr.toSql()); } - builder.append(Joiner.on(", ").join(rangeElementStrings)); - builder.append(")"); - return builder.toString(); + return builder.append(")").toString(); } - TDistributeParam toThrift() { + public TDistributeParam toThrift() { TDistributeParam result = new TDistributeParam(); + // TODO: Add a validate() function to ensure the validity of distribute params. if (type_ == Type.HASH) { TDistributeByHashParam hash = new TDistributeByHashParam(); - hash.setNum_buckets(num_buckets_); - hash.setColumns(columns_); + Preconditions.checkState(numBuckets_ != NO_BUCKETS); + hash.setNum_buckets(numBuckets_); + hash.setColumns(colNames_); result.setBy_hash_param(hash); } else { Preconditions.checkState(type_ == Type.RANGE); - - result.setBy_range_param(rangeParam_); + TDistributeByRangeParam rangeParam = new TDistributeByRangeParam(); + rangeParam.setColumns(colNames_); + if (splitRows_ == null) { + result.setBy_range_param(rangeParam); + return result; + } + for (List<LiteralExpr> splitRow : splitRows_) { + TRangeLiteralList list = new TRangeLiteralList(); + for (int i = 0; i < splitRow.size(); ++i) { + LiteralExpr expr = splitRow.get(i); + TRangeLiteral literal = new TRangeLiteral(); + if (expr instanceof NumericLiteral) { + literal.setInt_literal(((NumericLiteral)expr).getIntValue()); + } else { + String exprValue = expr.getStringValue(); + Preconditions.checkState(!Strings.isNullOrEmpty(exprValue)); + literal.setString_literal(exprValue); + } + list.addToValues(literal); + } + rangeParam.addToSplit_rows(list); + } + result.setBy_range_param(rangeParam); } return result; } - public List<String> getColumns() { return columns_; } - public void setColumns(List<String> cols) { columns_ = cols; } - public Type getType_() { return type_; } - public int getNumBuckets() { return num_buckets_; } + void setPkColumnDefMap(Map<String, ColumnDef> pkColumnDefByName) { + pkColumnDefByName_ = pkColumnDefByName; + } + + boolean hasColumnNames() { return !colNames_.isEmpty(); } + + void setColumnNames(Collection<String> colNames) { + Preconditions.checkState(colNames_.isEmpty()); + colNames_.addAll(colNames); + } + + public Type getType() { return type_; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java index 24b8417..28de1a8 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java @@ -190,7 +190,7 @@ public abstract class ModifyStmt extends StatementBase { // cast result expressions to the correct type of the referenced slot of the // target table - int keyColumnsOffset = table_.getKuduKeyColumnNames().size(); + int keyColumnsOffset = table_.getPrimaryKeyColumnNames().size(); for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) { sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo( assignments_.get(i - keyColumnsOffset).first.getType())); @@ -225,7 +225,7 @@ public abstract class ModifyStmt extends StatementBase { } // Add the key columns as slot refs - for (String k : table_.getKuduKeyColumnNames()) { + for (String k : table_.getPrimaryKeyColumnNames()) { ArrayList<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), k); SlotRef ref = new SlotRef(path); ref.analyze(analyzer); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java new file mode 100644 index 0000000..4d3ed80 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Represents the PARTITION BY and DISTRIBUTED BY clauses of a DDL statement. + * TODO: Reconsider this class when we add support for new range partitioning syntax (see + * IMPALA-3724). + */ +class TableDataLayout { + + private final List<ColumnDef> partitionColDefs_; + private final List<DistributeParam> distributeParams_; + + private TableDataLayout(List<ColumnDef> partitionColumnDefs, + List<DistributeParam> distributeParams) { + partitionColDefs_ = partitionColumnDefs; + distributeParams_ = distributeParams; + } + + static TableDataLayout createPartitionedLayout(List<ColumnDef> partitionColumnDefs) { + return new TableDataLayout(partitionColumnDefs, + Lists.<DistributeParam>newArrayList()); + } + + static TableDataLayout createDistributedLayout(List<DistributeParam> distributeParams) { + return new TableDataLayout(Lists.<ColumnDef>newArrayList(), distributeParams); + } + + static TableDataLayout createEmptyLayout() { + return new TableDataLayout(Lists.<ColumnDef>newArrayList(), + Lists.<DistributeParam>newArrayList()); + } + + List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; } + List<DistributeParam> getDistributeParams() { return distributeParams_; } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/TableDef.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java new file mode 100644 index 0000000..ce08e36 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.impala.authorization.Privilege; +import org.apache.impala.catalog.HdfsStorageDescriptor; +import org.apache.impala.catalog.RowFormat; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.FileSystemUtil; +import org.apache.impala.thrift.TAccessEvent; +import org.apache.impala.thrift.TCatalogObjectType; +import org.apache.impala.thrift.THdfsFileFormat; +import org.apache.impala.util.MetaStoreUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.hadoop.fs.permission.FsAction; + +/** + * Represents the table parameters in a CREATE TABLE statement. These parameters + * correspond to the following clauses in a CREATE TABLE statement: + * - EXTERNAL + * - IF NOT EXISTS + * - PARTITIONED BY + * - DISTRIBUTE BY + * - ROWFORMAT + * - FILEFORMAT + * - COMMENT + * - SERDEPROPERTIES + * - TBLPROPERTIES + * - LOCATION + * - CACHED IN + */ +class TableDef { + + // Name of the new table + private final TableName tableName_; + + // List of column definitions + private final List<ColumnDef> columnDefs_ = Lists.newArrayList(); + + // Names of primary key columns. Populated by the parser. An empty value doesn't + // mean no primary keys were specified as the columnDefs_ could contain primary keys. + private final List<String> primaryKeyColNames_ = Lists.newArrayList(); + + // Authoritative list of primary key column definitions populated during analysis. + private final List<ColumnDef> primaryKeyColDefs_ = Lists.newArrayList(); + + // If true, the table's data will be preserved if dropped. + private final boolean isExternal_; + + // If true, no errors are thrown if the table already exists. + private final boolean ifNotExists_; + + // Partitioned/distribute by parameters. + private final TableDataLayout dataLayout_; + + // True if analyze() has been called. + private boolean isAnalyzed_ = false; + + /** + * Set of table options. These options are grouped together for convenience while + * parsing CREATE TABLE statements. They are typically found at the end of CREATE + * TABLE statements. + */ + static class Options { + // Comment to attach to the table + final String comment; + + // Custom row format of the table. Leave null to specify default row format. + final RowFormat rowFormat; + + // Key/values to persist with table serde metadata. + final Map<String, String> serdeProperties; + + // File format of the table + final THdfsFileFormat fileFormat; + + // The HDFS location of where the table data will stored. + final HdfsUri location; + + // The HDFS caching op that should be applied to this table. + final HdfsCachingOp cachingOp; + + // Key/values to persist with table metadata. + final Map<String, String> tblProperties; + + Options(String comment, RowFormat rowFormat, + Map<String, String> serdeProperties, THdfsFileFormat fileFormat, HdfsUri location, + HdfsCachingOp cachingOp, Map<String, String> tblProperties) { + this.comment = comment; + this.rowFormat = rowFormat; + Preconditions.checkNotNull(serdeProperties); + this.serdeProperties = serdeProperties; + this.fileFormat = fileFormat == null ? THdfsFileFormat.TEXT : fileFormat; + this.location = location; + this.cachingOp = cachingOp; + Preconditions.checkNotNull(tblProperties); + this.tblProperties = tblProperties; + } + + public Options(String comment) { + this(comment, RowFormat.DEFAULT_ROW_FORMAT, Maps.<String, String>newHashMap(), + THdfsFileFormat.TEXT, null, null, Maps.<String, String>newHashMap()); + } + } + + private Options options_; + + // Result of analysis. + private TableName fqTableName_; + + TableDef(TableName tableName, boolean isExternal, boolean ifNotExists) { + tableName_ = tableName; + isExternal_ = isExternal; + ifNotExists_ = ifNotExists; + dataLayout_ = TableDataLayout.createEmptyLayout(); + } + + public TableName getTblName() { + return fqTableName_ != null ? fqTableName_ : tableName_; + } + public String getTbl() { return tableName_.getTbl(); } + public boolean isAnalyzed() { return isAnalyzed_; } + List<ColumnDef> getColumnDefs() { return columnDefs_; } + List<ColumnDef> getPartitionColumnDefs() { + return dataLayout_.getPartitionColumnDefs(); + } + List<String> getPrimaryKeyColumnNames() { return primaryKeyColNames_; } + List<ColumnDef> getPrimaryKeyColumnDefs() { return primaryKeyColDefs_; } + boolean isExternal() { return isExternal_; } + boolean getIfNotExists() { return ifNotExists_; } + List<DistributeParam> getDistributeParams() { + return dataLayout_.getDistributeParams(); + } + void setOptions(Options options) { + Preconditions.checkNotNull(options); + options_ = options; + } + String getComment() { return options_.comment; } + Map<String, String> getTblProperties() { return options_.tblProperties; } + HdfsCachingOp getCachingOp() { return options_.cachingOp; } + HdfsUri getLocation() { return options_.location; } + Map<String, String> getSerdeProperties() { return options_.serdeProperties; } + THdfsFileFormat getFileFormat() { return options_.fileFormat; } + RowFormat getRowFormat() { return options_.rowFormat; } + + /** + * Analyzes the parameters of a CREATE TABLE statement. + */ + void analyze(Analyzer analyzer) throws AnalysisException { + Preconditions.checkState(tableName_ != null && !tableName_.isEmpty()); + fqTableName_ = analyzer.getFqTableName(getTblName()); + fqTableName_.analyze(); + analyzeColumnDefs(); + analyzePrimaryKeys(); + + if (analyzer.dbContainsTable(getTblName().getDb(), getTbl(), Privilege.CREATE) + && !getIfNotExists()) { + throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + getTblName()); + } + + analyzer.addAccessEvent(new TAccessEvent(getTblName().toString(), + TCatalogObjectType.TABLE, Privilege.CREATE.toString())); + + Preconditions.checkNotNull(options_); + analyzeOptions(analyzer); + isAnalyzed_ = true; + } + + /** + * Analyzes table and partition column definitions, checking whether all column + * names are unique. + */ + private void analyzeColumnDefs() throws AnalysisException { + Set<String> colNames = Sets.newHashSet(); + for (ColumnDef colDef: columnDefs_) { + colDef.analyze(); + if (!colNames.add(colDef.getColName().toLowerCase())) { + throw new AnalysisException("Duplicate column name: " + colDef.getColName()); + } + } + for (ColumnDef colDef: getPartitionColumnDefs()) { + colDef.analyze(); + if (!colDef.getType().supportsTablePartitioning()) { + throw new AnalysisException( + String.format("Type '%s' is not supported as partition-column type " + + "in column: %s", colDef.getType().toSql(), colDef.getColName())); + } + if (!colNames.add(colDef.getColName().toLowerCase())) { + throw new AnalysisException("Duplicate column name: " + colDef.getColName()); + } + } + } + + /** + * Analyzes the primary key columns. Checks if the specified primary key columns exist + * in the table column definitions and if composite primary keys are properly defined + * using the PRIMARY KEY (col,..col) clause. + */ + private void analyzePrimaryKeys() throws AnalysisException { + for (ColumnDef colDef: columnDefs_) { + if (colDef.isPrimaryKey()) primaryKeyColDefs_.add(colDef); + } + if (primaryKeyColDefs_.size() > 1) { + throw new AnalysisException("Multiple primary keys specified. " + + "Composite primary keys can be specified using the " + + "PRIMARY KEY (col1, col2, ...) syntax at the end of the column definition."); + } + if (primaryKeyColNames_.isEmpty()) return; + if (!primaryKeyColDefs_.isEmpty()) { + throw new AnalysisException("Multiple primary keys specified. " + + "Composite primary keys can be specified using the " + + "PRIMARY KEY (col1, col2, ...) syntax at the end of the column definition."); + } + Map<String, ColumnDef> colDefsByColName = ColumnDef.mapByColumnNames(columnDefs_); + for (String colName: primaryKeyColNames_) { + colName = colName.toLowerCase(); + ColumnDef colDef = colDefsByColName.remove(colName); + if (colDef == null) { + if (ColumnDef.toColumnNames(primaryKeyColDefs_).contains(colName)) { + throw new AnalysisException(String.format("Column '%s' is listed multiple " + + "times as a PRIMARY KEY.", colName)); + } + throw new AnalysisException(String.format( + "PRIMARY KEY column '%s' does not exist in the table", colName)); + } + primaryKeyColDefs_.add(colDef); + } + } + + private void analyzeOptions(Analyzer analyzer) throws AnalysisException { + MetaStoreUtil.checkShortPropertyMap("Property", options_.tblProperties); + MetaStoreUtil.checkShortPropertyMap("Serde property", options_.serdeProperties); + + if (options_.location != null) { + options_.location.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE); + } + + if (options_.cachingOp != null) { + options_.cachingOp.analyze(analyzer); + if (options_.cachingOp.shouldCache() && options_.location != null && + !FileSystemUtil.isPathCacheable(options_.location.getPath())) { + throw new AnalysisException(String.format("Location '%s' cannot be cached. " + + "Please retry without caching: CREATE TABLE ... UNCACHED", + options_.location)); + } + } + + // Analyze 'skip.header.line.format' property. + AlterTableSetTblProperties.analyzeSkipHeaderLineCount(options_.tblProperties); + analyzeRowFormat(analyzer); + } + + private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException { + if (options_.rowFormat == null) return; + if (options_.fileFormat == THdfsFileFormat.KUDU) { + throw new AnalysisException(String.format( + "ROW FORMAT cannot be specified for file format %s.", options_.fileFormat)); + } + + Byte fieldDelim = analyzeRowFormatValue(options_.rowFormat.getFieldDelimiter()); + Byte lineDelim = analyzeRowFormatValue(options_.rowFormat.getLineDelimiter()); + Byte escapeChar = analyzeRowFormatValue(options_.rowFormat.getEscapeChar()); + if (options_.fileFormat == THdfsFileFormat.TEXT) { + if (fieldDelim == null) fieldDelim = HdfsStorageDescriptor.DEFAULT_FIELD_DELIM; + if (lineDelim == null) lineDelim = HdfsStorageDescriptor.DEFAULT_LINE_DELIM; + if (escapeChar == null) escapeChar = HdfsStorageDescriptor.DEFAULT_ESCAPE_CHAR; + if (fieldDelim.equals(lineDelim)) { + throw new AnalysisException("Field delimiter and line delimiter have same " + + "value: byte " + fieldDelim); + } + if (fieldDelim.equals(escapeChar)) { + analyzer.addWarning("Field delimiter and escape character have same value: " + + "byte " + fieldDelim + ". Escape character will be ignored"); + } + if (lineDelim.equals(escapeChar)) { + analyzer.addWarning("Line delimiter and escape character have same value: " + + "byte " + lineDelim + ". Escape character will be ignored"); + } + } + } + + private Byte analyzeRowFormatValue(String value) throws AnalysisException { + if (value == null) return null; + Byte byteVal = HdfsStorageDescriptor.parseDelim(value); + if (byteVal == null) { + throw new AnalysisException("ESCAPED BY values and LINE/FIELD " + + "terminators must be specified as a single character or as a decimal " + + "value in the range [-128:127]: " + value); + } + return byteVal; + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java index b125987..aa24336 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java +++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java @@ -22,10 +22,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.impala.catalog.KuduTable; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.antlr.runtime.ANTLRStringStream; import org.antlr.runtime.Token; import org.apache.commons.lang.StringEscapeUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.parse.HiveLexer; @@ -35,16 +41,11 @@ import org.apache.impala.catalog.Function; import org.apache.impala.catalog.HBaseTable; import org.apache.impala.catalog.HdfsCompression; import org.apache.impala.catalog.HdfsFileFormat; +import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.RowFormat; import org.apache.impala.catalog.Table; import org.apache.impala.catalog.View; -import org.apache.impala.common.PrintUtils; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import org.apache.impala.util.KuduUtil; /** * Contains utility methods for creating SQL strings, for example, @@ -132,8 +133,9 @@ public class ToSqlUtils { } // TODO: Pass the correct compression, if applicable. return getCreateTableSql(stmt.getDb(), stmt.getTbl(), stmt.getComment(), colsSql, - partitionColsSql, stmt.getTblProperties(), stmt.getSerdeProperties(), - stmt.isExternal(), stmt.getIfNotExists(), stmt.getRowFormat(), + partitionColsSql, stmt.getTblPrimaryKeyColumnNames(), null, + stmt.getTblProperties(), stmt.getSerdeProperties(), stmt.isExternal(), + stmt.getIfNotExists(), stmt.getRowFormat(), HdfsFileFormat.fromThrift(stmt.getFileFormat()), HdfsCompression.NONE, null, stmt.getLocation()); } @@ -152,7 +154,8 @@ public class ToSqlUtils { } // TODO: Pass the correct compression, if applicable. String createTableSql = getCreateTableSql(innerStmt.getDb(), innerStmt.getTbl(), - innerStmt.getComment(), null, partitionColsSql, innerStmt.getTblProperties(), + innerStmt.getComment(), null, partitionColsSql, + innerStmt.getTblPrimaryKeyColumnNames(), null, innerStmt.getTblProperties(), innerStmt.getSerdeProperties(), innerStmt.isExternal(), innerStmt.getIfNotExists(), innerStmt.getRowFormat(), HdfsFileFormat.fromThrift(innerStmt.getFileFormat()), HdfsCompression.NONE, null, @@ -169,6 +172,9 @@ public class ToSqlUtils { if (table instanceof View) return getCreateViewSql((View)table); org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable(); HashMap<String, String> properties = Maps.newHashMap(msTable.getParameters()); + if (properties.containsKey("transient_lastDdlTime")) { + properties.remove("transient_lastDdlTime"); + } boolean isExternal = msTable.getTableType() != null && msTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString()); String comment = properties.get("comment"); @@ -194,17 +200,40 @@ public class ToSqlUtils { Map<String, String> serdeParameters = msTable.getSd().getSerdeInfo().getParameters(); String storageHandlerClassName = table.getStorageHandlerClassName(); + List<String> primaryKeySql = Lists.newArrayList(); + String kuduDistributeByParams = null; if (table instanceof KuduTable) { + KuduTable kuduTable = (KuduTable) table; // Kudu tables don't use LOCATION syntax location = null; - format = null; + format = HdfsFileFormat.KUDU; // Kudu tables cannot use the Hive DDL syntax for the storage handler storageHandlerClassName = null; + properties.remove(KuduTable.KEY_STORAGE_HANDLER); + String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME); + Preconditions.checkNotNull(kuduTableName); + if (kuduTableName.equals(KuduUtil.getDefaultCreateKuduTableName( + table.getDb().getName(), table.getName()))) { + properties.remove(KuduTable.KEY_TABLE_NAME); + } + // Internal property, should not be exposed to the user. + properties.remove(StatsSetupConst.DO_NOT_UPDATE_STATS); + + if (!isExternal) { + primaryKeySql.addAll(kuduTable.getPrimaryKeyColumnNames()); + + List<String> paramsSql = Lists.newArrayList(); + for (DistributeParam param: kuduTable.getDistributeBy()) { + paramsSql.add(param.toSql()); + } + kuduDistributeByParams = Joiner.on(", ").join(paramsSql); + } } HdfsUri tableLocation = location == null ? null : new HdfsUri(location); return getCreateTableSql(table.getDb().getName(), table.getName(), comment, colsSql, - partitionColsSql, properties, serdeParameters, isExternal, false, rowFormat, - format, compression, storageHandlerClassName, tableLocation); + partitionColsSql, primaryKeySql, kuduDistributeByParams, properties, + serdeParameters, isExternal, false, rowFormat, format, compression, + storageHandlerClassName, tableLocation); } /** @@ -214,6 +243,7 @@ public class ToSqlUtils { */ public static String getCreateTableSql(String dbName, String tableName, String tableComment, List<String> columnsSql, List<String> partitionColumnsSql, + List<String> primaryKeysSql, String kuduDistributeByParams, Map<String, String> tblProperties, Map<String, String> serdeParameters, boolean isExternal, boolean ifNotExists, RowFormat rowFormat, HdfsFileFormat fileFormat, HdfsCompression compression, String storageHandlerClass, @@ -227,7 +257,11 @@ public class ToSqlUtils { sb.append(tableName); if (columnsSql != null) { sb.append(" (\n "); - sb.append(Joiner.on(", \n ").join(columnsSql)); + sb.append(Joiner.on(",\n ").join(columnsSql)); + if (!primaryKeysSql.isEmpty()) { + sb.append(",\n PRIMARY KEY ("); + Joiner.on(", ").appendTo(sb, primaryKeysSql).append(")"); + } sb.append("\n)"); } sb.append("\n"); @@ -238,6 +272,10 @@ public class ToSqlUtils { Joiner.on(", \n ").join(partitionColumnsSql))); } + if (kuduDistributeByParams != null) { + sb.append("DISTRIBUTE BY " + kuduDistributeByParams + "\n"); + } + if (rowFormat != null && !rowFormat.isDefault()) { sb.append("ROW FORMAT DELIMITED"); if (rowFormat.getFieldDelimiter() != null) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Catalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java index 41573ed..733b2f2 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java @@ -519,4 +519,8 @@ public abstract class Catalog { } return result; } + + public static boolean isDefaultDb(String dbName) { + return DEFAULT_DB.equals(dbName.toLowerCase()); + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 41c8d62..149b00b 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -23,7 +23,6 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -41,7 +40,6 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FunctionType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.ResourceType; @@ -52,7 +50,6 @@ import org.apache.log4j.Logger; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.TException; -import org.apache.impala.analysis.TableName; import org.apache.impala.authorization.SentryConfig; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.common.FileSystemUtil; @@ -65,7 +62,6 @@ import org.apache.impala.thrift.TCatalog; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TFunction; -import org.apache.impala.thrift.TFunctionBinaryType; import org.apache.impala.thrift.TGetAllCatalogObjectsResponse; import org.apache.impala.thrift.TPartitionKeyValue; import org.apache.impala.thrift.TPrivilege; @@ -79,7 +75,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.io.Files; /** * Specialized Catalog that implements the CatalogService specific Catalog @@ -693,7 +688,7 @@ public class CatalogServiceCatalog extends Catalog { * Adds a table with the given name to the catalog and returns the new table, * loading the metadata if needed. */ - public Table addTable(String dbName, String tblName) throws TableNotFoundException { + public Table addTable(String dbName, String tblName) { Db db = getDb(dbName); if (db == null) return null; Table incompleteTable = http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Db.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java index d6fb185..0ed67c6 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Db.java +++ b/fe/src/main/java/org/apache/impala/catalog/Db.java @@ -152,6 +152,11 @@ public class Db implements CatalogObject { return Lists.newArrayList(tableCache_.keySet()); } + /** + * Returns the tables in the cache. + */ + public List<Table> getTables() { return tableCache_.getValues(); } + public boolean containsTable(String tableName) { return tableCache_.contains(tableName.toLowerCase()); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java index 86a65bd..e4fce60 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java @@ -31,8 +31,12 @@ import com.google.common.collect.Lists; * 2) the output format class * 3) the serialization library class * 4) whether scanning complex types from it is supported + * 5) whether the file format can skip complex columns in scans and just materialize + * scalar typed columns * * Important note: Always keep consistent with the classes used in Hive. + * TODO: Kudu doesn't belong in this list. Either rename this enum or create a separate + * list of storage engines (see IMPALA-4178). */ public enum HdfsFileFormat { RC_FILE("org.apache.hadoop.hive.ql.io.RCFileInputFormat", @@ -57,7 +61,10 @@ public enum HdfsFileFormat { PARQUET("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", - true, true); + true, true), + KUDU("org.apache.kudu.mapreduce.KuduTableInputFormat", + "org.apache.kudu.mapreduce.KuduTableOutputFormat", + "", false, false); private final String inputFormat_; private final String outputFormat_; @@ -103,6 +110,7 @@ public enum HdfsFileFormat { .put(PARQUET_LEGACY_INPUT_FORMATS[0], PARQUET) .put(PARQUET_LEGACY_INPUT_FORMATS[1], PARQUET) .put(PARQUET_LEGACY_INPUT_FORMATS[2], PARQUET) + .put(KUDU.inputFormat(), KUDU) .build(); /** @@ -138,6 +146,7 @@ public enum HdfsFileFormat { case SEQUENCE_FILE: return HdfsFileFormat.SEQUENCE_FILE; case AVRO: return HdfsFileFormat.AVRO; case PARQUET: return HdfsFileFormat.PARQUET; + case KUDU: return HdfsFileFormat.KUDU; default: throw new RuntimeException("Unknown THdfsFileFormat: " + thriftFormat + " - should never happen!"); @@ -151,6 +160,7 @@ public enum HdfsFileFormat { case SEQUENCE_FILE: return THdfsFileFormat.SEQUENCE_FILE; case AVRO: return THdfsFileFormat.AVRO; case PARQUET: return THdfsFileFormat.PARQUET; + case KUDU: return THdfsFileFormat.KUDU; default: throw new RuntimeException("Unknown HdfsFormat: " + this + " - should never happen!"); @@ -173,6 +183,7 @@ public enum HdfsFileFormat { case SEQUENCE_FILE: return "SEQUENCEFILE"; case AVRO: return "AVRO"; case PARQUET: return "PARQUET"; + case KUDU: return "KUDU"; default: throw new RuntimeException("Unknown HdfsFormat: " + this + " - should never happen!"); @@ -230,6 +241,8 @@ public enum HdfsFileFormat { case AVRO: case PARQUET: return true; + case KUDU: + return false; default: throw new RuntimeException("Unknown HdfsFormat: " + this + " - should never happen!"); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java index c416bee..3647256 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java @@ -87,12 +87,17 @@ public class ImpaladCatalog extends Catalog { // Object that is used to synchronize on and signal when a catalog update is received. private final Object catalogUpdateEventNotifier_ = new Object(); + // The addresses of the Kudu masters to use if no Kudu masters were explicitly provided. + // Used during table creation. + private final String defaultKuduMasterHosts_; + /** * C'tor used by tests that need to validate the ImpaladCatalog outside of the * CatalogServer. */ - public ImpaladCatalog() { + public ImpaladCatalog(String defaultKuduMasterHosts) { super(false); + defaultKuduMasterHosts_ = defaultKuduMasterHosts; } /** @@ -445,4 +450,5 @@ public class ImpaladCatalog extends Catalog { // Only used for testing. public void setIsReady(boolean isReady) { isReady_.set(isReady); } public AuthorizationPolicy getAuthPolicy() { return authPolicy_; } + public String getDefaultKuduMasterHosts() { return defaultKuduMasterHosts_; } }
