http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java new file mode 100644 index 0000000..7b59625 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.util.List; +import java.util.EnumSet; + +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.catalog.Db; +import com.cloudera.impala.catalog.HdfsTable; +import com.cloudera.impala.catalog.KuduTable; +import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient; +import com.cloudera.impala.catalog.Table; +import com.cloudera.impala.catalog.TableId; +import com.cloudera.impala.catalog.TableLoadingException; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.service.CatalogOpExecutor; +import com.cloudera.impala.thrift.THdfsFileFormat; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Represents a CREATE TABLE AS SELECT (CTAS) statement + * + * The statement supports an optional PARTITIONED BY clause. Its syntax and semantics + * follow the PARTITION feature of INSERT FROM SELECT statements: inside the PARTITIONED + * BY (...) column list the user must specify names of the columns to partition by. These + * column names must appear in the specified order at the end of the select statement. A + * remapping between columns of the source and destination tables is not possible, because + * the destination table does not yet exist. Specifying static values for the partition + * columns is also not possible, as their type needs to be deduced from columns in the + * select statement. + */ +public class CreateTableAsSelectStmt extends StatementBase { + private final CreateTableStmt createStmt_; + + // List of partition columns from the PARTITIONED BY (...) clause. Set to null if no + // partition was given. + private final List<String> partitionKeys_; + + ///////////////////////////////////////// + // BEGIN: Members that need to be reset() + + private final InsertStmt insertStmt_; + + // END: Members that need to be reset() + ///////////////////////////////////////// + + private final static EnumSet<THdfsFileFormat> SUPPORTED_INSERT_FORMATS = + EnumSet.of(THdfsFileFormat.PARQUET, THdfsFileFormat.TEXT); + + /** + * Builds a CREATE TABLE AS SELECT statement + */ + public CreateTableAsSelectStmt(CreateTableStmt createStmt, QueryStmt queryStmt, + List<String> partitionKeys) { + Preconditions.checkNotNull(queryStmt); + Preconditions.checkNotNull(createStmt); + createStmt_ = createStmt; + partitionKeys_ = partitionKeys; + List<PartitionKeyValue> pkvs = null; + if (partitionKeys != null) { + pkvs = Lists.newArrayList(); + for (String key: partitionKeys) { + pkvs.add(new PartitionKeyValue(key, null)); + } + } + insertStmt_ = new InsertStmt(null, createStmt.getTblName(), false, pkvs, + null, queryStmt, null, false); + } + + public QueryStmt getQueryStmt() { return insertStmt_.getQueryStmt(); } + public InsertStmt getInsertStmt() { return insertStmt_; } + public CreateTableStmt getCreateStmt() { return createStmt_; } + @Override + public String toSql() { return ToSqlUtils.getCreateTableSql(this); } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (isAnalyzed()) return; + super.analyze(analyzer); + + // The analysis for CTAS happens in two phases - the first phase happens before + // the target table exists and we want to validate the CREATE statement and the + // query portion of the insert statement. If this passes, analysis will be run + // over the full INSERT statement. To avoid duplicate registrations of table/colRefs, + // create a new root analyzer and clone the query statement for this initial pass. + Analyzer dummyRootAnalyzer = new Analyzer(analyzer.getCatalog(), + analyzer.getQueryCtx(), analyzer.getAuthzConfig()); + QueryStmt tmpQueryStmt = insertStmt_.getQueryStmt().clone(); + try { + Analyzer tmpAnalyzer = new Analyzer(dummyRootAnalyzer); + tmpAnalyzer.setUseHiveColLabels(true); + tmpQueryStmt.analyze(tmpAnalyzer); + // Subqueries need to be rewritten by the StmtRewriter first. + if (analyzer.containsSubquery()) return; + } finally { + // Record missing tables in the original analyzer. + analyzer.getMissingTbls().addAll(dummyRootAnalyzer.getMissingTbls()); + } + + // Add the columns from the partition clause to the create statement. + if (partitionKeys_ != null) { + int colCnt = tmpQueryStmt.getColLabels().size(); + int partColCnt = partitionKeys_.size(); + if (partColCnt >= colCnt) { + throw new AnalysisException(String.format("Number of partition columns (%s) " + + "must be smaller than the number of columns in the select statement (%s).", + partColCnt, colCnt)); + } + int firstCol = colCnt - partColCnt; + for (int i = firstCol, j = 0; i < colCnt; ++i, ++j) { + String partitionLabel = partitionKeys_.get(j); + String colLabel = tmpQueryStmt.getColLabels().get(i); + + // Ensure that partition columns are named and positioned at end of + // input column list. + if (!partitionLabel.equals(colLabel)) { + throw new AnalysisException(String.format("Partition column name " + + "mismatch: %s != %s", partitionLabel, colLabel)); + } + + ColumnDef colDef = new ColumnDef(colLabel, null, null); + colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType()); + createStmt_.getPartitionColumnDefs().add(colDef); + } + // Remove partition columns from table column list. + tmpQueryStmt.getColLabels().subList(firstCol, colCnt).clear(); + } + + // Add the columns from the select statement to the create statement. + int colCnt = tmpQueryStmt.getColLabels().size(); + createStmt_.getColumnDefs().clear(); + for (int i = 0; i < colCnt; ++i) { + ColumnDef colDef = new ColumnDef( + tmpQueryStmt.getColLabels().get(i), null, null); + colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType()); + createStmt_.getColumnDefs().add(colDef); + } + createStmt_.analyze(analyzer); + + if (!SUPPORTED_INSERT_FORMATS.contains(createStmt_.getFileFormat())) { + throw new AnalysisException(String.format("CREATE TABLE AS SELECT " + + "does not support (%s) file format. Supported formats are: (%s)", + createStmt_.getFileFormat().toString().replace("_", ""), + "PARQUET, TEXTFILE")); + } + + // The full privilege check for the database will be done as part of the INSERT + // analysis. + Db db = analyzer.getDb(createStmt_.getDb(), Privilege.ANY); + if (db == null) { + throw new AnalysisException( + Analyzer.DB_DOES_NOT_EXIST_ERROR_MSG + createStmt_.getDb()); + } + + // Running analysis on the INSERT portion of the CTAS requires the target INSERT + // table to "exist". For CTAS the table does not exist yet, so create a "temp" + // table to run analysis against. The schema of this temp table should exactly + // match the schema of the table that will be created by running the CREATE + // statement. + org.apache.hadoop.hive.metastore.api.Table msTbl = + CatalogOpExecutor.createMetaStoreTable(createStmt_.toThrift()); + + try (MetaStoreClient client = analyzer.getCatalog().getMetaStoreClient()) { + // Set a valid location of this table using the same rules as the metastore. If the + // user specified a location for the table this will be a no-op. + msTbl.getSd().setLocation(analyzer.getCatalog().getTablePath(msTbl).toString()); + + // Create a "temp" table based off the given metastore.api.Table object. Normally, + // the CatalogService assigns all table IDs, but in this case we need to assign the + // "temp" table an ID locally. This table ID cannot conflict with any table in the + // SelectStmt (or the BE will be very confused). To ensure the ID is unique within + // this query, just assign it the invalid table ID. The CatalogServer will assign + // this table a proper ID once it is created there as part of the CTAS execution. + Table table = Table.fromMetastoreTable(TableId.createInvalidId(), db, msTbl); + Preconditions.checkState(table != null && + (table instanceof HdfsTable || table instanceof KuduTable)); + + table.load(true, client.getHiveClient(), msTbl); + insertStmt_.setTargetTable(table); + } catch (TableLoadingException e) { + throw new AnalysisException(e.getMessage(), e); + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e); + } + + // Finally, run analysis on the insert statement. + insertStmt_.analyze(analyzer); + } + + @Override + public void reset() { + super.reset(); + insertStmt_.reset(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java new file mode 100644 index 0000000..0faf881 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_API_VER; +import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_CLASS; +import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_DATA_SRC_NAME; +import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_INIT_STRING; +import static com.cloudera.impala.catalog.DataSourceTable.TBL_PROP_LOCATION; + +import java.util.List; +import java.util.Map; + +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.catalog.DataSource; +import com.cloudera.impala.catalog.DataSourceTable; +import com.cloudera.impala.catalog.RowFormat; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.thrift.THdfsFileFormat; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.fs.permission.FsAction; + +/** + * Represents a CREATE TABLE statement for external data sources. Such tables + * reference an external data source (created with a CREATE DATA SOURCE statement) + * and the properties of that source are stored in the table properties because + * the metastore does not store the data sources themselves. + */ +public class CreateTableDataSrcStmt extends CreateTableStmt { + + public CreateTableDataSrcStmt(TableName tableName, List<ColumnDef> columnDefs, + String dataSourceName, String initString, String comment, boolean ifNotExists) { + super(tableName, columnDefs, Lists.<ColumnDef>newArrayList(), false, comment, + RowFormat.DEFAULT_ROW_FORMAT, THdfsFileFormat.TEXT, null, null, ifNotExists, + createInitialTableProperties(dataSourceName, initString), + Maps.<String, String>newHashMap(), null); + } + + /** + * Creates the initial map of table properties containing the name of the data + * source and the table init string. + */ + private static Map<String, String> createInitialTableProperties( + String dataSourceName, String initString) { + Preconditions.checkNotNull(dataSourceName); + Map<String, String> tableProperties = Maps.newHashMap(); + tableProperties.put(TBL_PROP_DATA_SRC_NAME, dataSourceName.toLowerCase()); + tableProperties.put(TBL_PROP_INIT_STRING, Strings.nullToEmpty(initString)); + return tableProperties; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + super.analyze(analyzer); + String dataSourceName = getTblProperties().get(TBL_PROP_DATA_SRC_NAME); + DataSource dataSource = analyzer.getCatalog().getDataSource(dataSourceName); + if (dataSource == null) { + throw new AnalysisException("Data source does not exist: " + dataSourceName); + } + + for (ColumnDef col: getColumnDefs()) { + if (!DataSourceTable.isSupportedColumnType(col.getType())) { + throw new AnalysisException("Tables produced by an external data source do " + + "not support the column type: " + col.getType()); + } + } + // Add table properties from the DataSource catalog object now that we have access + // to the catalog. These are stored in the table metadata because DataSource catalog + // objects are not currently persisted. + String location = dataSource.getLocation(); + getTblProperties().put(TBL_PROP_LOCATION, location); + getTblProperties().put(TBL_PROP_CLASS, dataSource.getClassName()); + getTblProperties().put(TBL_PROP_API_VER, dataSource.getApiVersion()); + new HdfsUri(location).analyze(analyzer, Privilege.ALL, FsAction.READ); + // TODO: check class exists and implements API version + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java new file mode 100644 index 0000000..6695cac --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java @@ -0,0 +1,375 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; + +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.metadata.ParquetMetadata; +import parquet.schema.OriginalType; +import parquet.schema.PrimitiveType; + +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.catalog.ArrayType; +import com.cloudera.impala.catalog.HdfsCompression; +import com.cloudera.impala.catalog.HdfsFileFormat; +import com.cloudera.impala.catalog.MapType; +import com.cloudera.impala.catalog.RowFormat; +import com.cloudera.impala.catalog.ScalarType; +import com.cloudera.impala.catalog.StructField; +import com.cloudera.impala.catalog.StructType; +import com.cloudera.impala.catalog.Type; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.common.FileSystemUtil; +import com.cloudera.impala.thrift.THdfsFileFormat; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + + +/** + * Represents a CREATE TABLE tablename LIKE fileformat '/path/to/file' statement + * where the schema is inferred from the given file. Does not partition the table by + * default. + */ +public class CreateTableLikeFileStmt extends CreateTableStmt { + private final HdfsUri schemaLocation_; + private final THdfsFileFormat schemaFileFormat_; + private final static String ERROR_MSG = + "Failed to convert Parquet type\n%s\nto an Impala %s type:\n%s\n"; + + public CreateTableLikeFileStmt(TableName tableName, THdfsFileFormat schemaFileFormat, + HdfsUri schemaLocation, List<ColumnDef> partitionColumnDescs, + boolean isExternal, String comment, RowFormat rowFormat, + THdfsFileFormat fileFormat, HdfsUri location, HdfsCachingOp cachingOp, + boolean ifNotExists, Map<String, String> tblProperties, + Map<String, String> serdeProperties) { + super(tableName, new ArrayList<ColumnDef>(), partitionColumnDescs, + isExternal, comment, rowFormat, + fileFormat, location, cachingOp, ifNotExists, tblProperties, serdeProperties, + null); + schemaLocation_ = schemaLocation; + schemaFileFormat_ = schemaFileFormat; + } + + /** + * Reads the first block from the given HDFS file and returns the Parquet schema. + * Throws Analysis exception for any failure, such as failing to read the file + * or failing to parse the contents. + */ + private static parquet.schema.MessageType loadParquetSchema(Path pathToFile) + throws AnalysisException { + try { + FileSystem fs = pathToFile.getFileSystem(FileSystemUtil.getConfiguration()); + if (!fs.isFile(pathToFile)) { + throw new AnalysisException("Cannot infer schema, path is not a file: " + + pathToFile); + } + } catch (IOException e) { + throw new AnalysisException("Failed to connect to filesystem:" + e); + } catch (IllegalArgumentException e) { + throw new AnalysisException(e.getMessage()); + } + ParquetMetadata readFooter = null; + try { + readFooter = ParquetFileReader.readFooter(FileSystemUtil.getConfiguration(), + pathToFile); + } catch (FileNotFoundException e) { + throw new AnalysisException("File not found: " + e); + } catch (IOException e) { + throw new AnalysisException("Failed to open file as a parquet file: " + e); + } catch (RuntimeException e) { + // Parquet throws a generic RuntimeException when reading a non-parquet file + if (e.toString().contains("is not a Parquet file")) { + throw new AnalysisException("File is not a parquet file: " + pathToFile); + } + // otherwise, who knows what we caught, throw it back up + throw e; + } + return readFooter.getFileMetaData().getSchema(); + } + + /** + * Converts a "primitive" Parquet type to an Impala type. + * A primitive type is a non-nested type with no annotations. + */ + private static Type convertPrimitiveParquetType(parquet.schema.Type parquetType) + throws AnalysisException { + Preconditions.checkState(parquetType.isPrimitive()); + PrimitiveType prim = parquetType.asPrimitiveType(); + switch (prim.getPrimitiveTypeName()) { + case BINARY: return Type.STRING; + case BOOLEAN: return Type.BOOLEAN; + case DOUBLE: return Type.DOUBLE; + case FIXED_LEN_BYTE_ARRAY: + throw new AnalysisException( + "Unsupported parquet type FIXED_LEN_BYTE_ARRAY for field " + + parquetType.getName()); + case FLOAT: return Type.FLOAT; + case INT32: return Type.INT; + case INT64: return Type.BIGINT; + case INT96: return Type.TIMESTAMP; + default: + Preconditions.checkState(false, "Unexpected parquet primitive type: " + + prim.getPrimitiveTypeName()); + return null; + } + } + + /** + * Converts a Parquet group type to an Impala map Type. We support both standard + * Parquet map representations, as well as legacy. Legacy representations are handled + * according to this specification: + * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1 + * + * Standard representation of a map in Parquet: + * <optional | required> group <name> (MAP) { <-- outerGroup is pointing at this + * repeated group key_value { + * required <key-type> key; + * <optional | required> <value-type> value; + * } + * } + */ + private static MapType convertMap(parquet.schema.GroupType outerGroup) + throws AnalysisException { + if (outerGroup.getFieldCount() != 1){ + throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(), + "MAP", "The logical MAP type must have exactly 1 inner field.")); + } + + parquet.schema.Type innerField = outerGroup.getType(0); + if (!innerField.isRepetition(parquet.schema.Type.Repetition.REPEATED)){ + throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(), + "MAP", "The logical MAP type must have a repeated inner field.")); + } + if (innerField.isPrimitive()) { + throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(), + "MAP", "The inner field of the logical MAP type must be a group.")); + } + + parquet.schema.GroupType innerGroup = innerField.asGroupType(); + // It does not matter whether innerGroup has an annotation or not (for example it may + // be annotated with MAP_KEY_VALUE). We treat the case that innerGroup has an + // annotation and the case the innerGroup does not have an annotation the same. + if (innerGroup.getFieldCount() != 2) { + throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(), + "MAP", "The inner field of the logical MAP type must have exactly 2 fields.")); + } + + parquet.schema.Type key = innerGroup.getType(0); + if (!key.getName().equals("key")) { + throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(), + "MAP", "The name of the first field of the inner field of the logical MAP " + + "type must be 'key'")); + } + if (!key.isPrimitive()) { + throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(), + "MAP", "The key type of the logical MAP type must be primitive.")); + } + parquet.schema.Type value = innerGroup.getType(1); + if (!value.getName().equals("value")) { + throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(), + "MAP", "The name of the second field of the inner field of the logical MAP " + + "type must be 'value'")); + } + + return new MapType(convertParquetType(key), convertParquetType(value)); + } + + /** + * Converts a Parquet group type to an Impala struct Type. + */ + private static StructType convertStruct(parquet.schema.GroupType outerGroup) + throws AnalysisException { + ArrayList<StructField> structFields = new ArrayList<StructField>(); + for (parquet.schema.Type field: outerGroup.getFields()) { + StructField f = new StructField(field.getName(), convertParquetType(field)); + structFields.add(f); + } + return new StructType(structFields); + } + + /** + * Converts a Parquet group type to an Impala array Type. We can handle the standard + * representation, but also legacy representations for backwards compatibility. + * Legacy representations are handled according to this specification: + * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules + * + * Standard representation of an array in Parquet: + * <optional | required> group <name> (LIST) { <-- outerGroup is pointing at this + * repeated group list { + * <optional | required> <element-type> element; + * } + * } + */ + private static ArrayType convertArray(parquet.schema.GroupType outerGroup) + throws AnalysisException { + if (outerGroup.getFieldCount() != 1) { + throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(), + "LIST", "The logical LIST type must have exactly 1 inner field.")); + } + + parquet.schema.Type innerField = outerGroup.getType(0); + if (!innerField.isRepetition(parquet.schema.Type.Repetition.REPEATED)) { + throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(), + "LIST", "The inner field of the logical LIST type must be repeated.")); + } + if (innerField.isPrimitive() || innerField.getOriginalType() != null) { + // From the Parquet Spec: + // 1. If the repeated field is not a group then it's type is the element type. + // + // If innerField is a group, but originalType is not null, the element type is + // based on the logical type. + return new ArrayType(convertParquetType(innerField)); + } + + parquet.schema.GroupType innerGroup = innerField.asGroupType(); + if (innerGroup.getFieldCount() != 1) { + // From the Parquet Spec: + // 2. If the repeated field is a group with multiple fields, then it's type is a + // struct. + return new ArrayType(convertStruct(innerGroup)); + } + + return new ArrayType(convertParquetType(innerGroup.getType(0))); + } + + /** + * Converts a "logical" Parquet type to an Impala column type. + * A Parquet type is considered logical when it has an annotation. The annotation is + * stored as a "OriginalType". The Parquet documentation refers to these as logical + * types, so we use that terminology here. + */ + private static Type convertLogicalParquetType(parquet.schema.Type parquetType) + throws AnalysisException { + OriginalType orig = parquetType.getOriginalType(); + if (orig == OriginalType.LIST) { + return convertArray(parquetType.asGroupType()); + } + if (orig == OriginalType.MAP || orig == OriginalType.MAP_KEY_VALUE) { + // MAP_KEY_VALUE annotation should not be used any more. However, according to the + // Parquet spec, some existing data incorrectly uses MAP_KEY_VALUE in place of MAP. + // For backward-compatibility, a group annotated with MAP_KEY_VALUE that is not + // contained by a MAP-annotated group should be handled as a MAP-annotated group. + return convertMap(parquetType.asGroupType()); + } + + PrimitiveType prim = parquetType.asPrimitiveType(); + if (prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY && + orig == OriginalType.UTF8) { + // UTF8 is the type annotation Parquet uses for strings + // We check to make sure it applies to BINARY to avoid errors if there is a bad + // annotation. + return Type.STRING; + } + + if (orig == OriginalType.DECIMAL) { + return ScalarType.createDecimalType(prim.getDecimalMetadata().getPrecision(), + prim.getDecimalMetadata().getScale()); + } + + throw new AnalysisException( + "Unsupported logical parquet type " + orig + " (primitive type is " + + prim.getPrimitiveTypeName().name() + ") for field " + + parquetType.getName()); + } + + /** + * Converts a Parquet type into an Impala type. + */ + private static Type convertParquetType(parquet.schema.Type field) + throws AnalysisException { + Type type = null; + // TODO for 2.3: If a field is not annotated with LIST, it can still be sometimes + // interpreted as an array. The following 2 examples should be interpreted as an array + // of integers, but this is currently not done. + // 1. repeated int int_col; + // 2. required group int_arr { + // repeated group list { + // required int element; + // } + // } + if (field.getOriginalType() != null) { + type = convertLogicalParquetType(field); + } else if (field.isPrimitive()) { + type = convertPrimitiveParquetType(field); + } else { + // If field is not primitive, it must be a struct. + type = convertStruct(field.asGroupType()); + } + return type; + } + + /** + * Parses a Parquet file stored in HDFS and returns the corresponding Impala schema. + * This fails with an analysis exception if any errors occur reading the file, + * parsing the Parquet schema, or if the Parquet types cannot be represented in Impala. + */ + private static List<ColumnDef> extractParquetSchema(HdfsUri location) + throws AnalysisException { + parquet.schema.MessageType parquetSchema = loadParquetSchema(location.getPath()); + List<parquet.schema.Type> fields = parquetSchema.getFields(); + List<ColumnDef> schema = new ArrayList<ColumnDef>(); + + for (parquet.schema.Type field: fields) { + Type type = convertParquetType(field); + Preconditions.checkNotNull(type); + String colName = field.getName(); + schema.add(new ColumnDef(colName, new TypeDef(type), + "Inferred from Parquet file.")); + } + return schema; + } + + @Override + public String toSql() { + ArrayList<String> colsSql = Lists.newArrayList(); + ArrayList<String> partitionColsSql = Lists.newArrayList(); + HdfsCompression compression = HdfsCompression.fromFileName( + schemaLocation_.toString()); + String s = ToSqlUtils.getCreateTableSql(getDb(), + getTbl() + " __LIKE_FILEFORMAT__ ", getComment(), colsSql, partitionColsSql, + getTblProperties(), getSerdeProperties(), isExternal(), getIfNotExists(), + getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()), + compression, null, getLocation()); + s = s.replace("__LIKE_FILEFORMAT__", "LIKE " + schemaFileFormat_ + " " + + schemaLocation_.toString()); + return s; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + schemaLocation_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE); + switch (schemaFileFormat_) { + case PARQUET: + getColumnDefs().addAll(extractParquetSchema(schemaLocation_)); + break; + default: + throw new AnalysisException("Unsupported file type for schema inference: " + + schemaFileFormat_); + } + super.analyze(analyzer); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java new file mode 100644 index 0000000..a7e2038 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import org.apache.hadoop.fs.permission.FsAction; + +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.thrift.TAccessEvent; +import com.cloudera.impala.thrift.TCatalogObjectType; +import com.cloudera.impala.thrift.TCreateTableLikeParams; +import com.cloudera.impala.thrift.THdfsFileFormat; +import com.cloudera.impala.thrift.TTableName; +import com.google.common.base.Preconditions; + +/** + * Represents a CREATE TABLE LIKE statement which creates a new table based on + * a copy of an existing table definition. + */ +public class CreateTableLikeStmt extends StatementBase { + private final TableName tableName_; + private final TableName srcTableName_; + private final boolean isExternal_; + private final String comment_; + private final THdfsFileFormat fileFormat_; + private final HdfsUri location_; + private final boolean ifNotExists_; + + // Set during analysis + private String dbName_; + private String srcDbName_; + private String owner_; + + /** + * Builds a CREATE TABLE LIKE statement + * @param tableName - Name of the new table + * @param srcTableName - Name of the source table (table to copy) + * @param isExternal - If true, the table's data will be preserved if dropped. + * @param comment - Comment to attach to the table + * @param fileFormat - File format of the table + * @param location - The HDFS location of where the table data will stored. + * @param ifNotExists - If true, no errors are thrown if the table already exists + */ + public CreateTableLikeStmt(TableName tableName, TableName srcTableName, + boolean isExternal, String comment, THdfsFileFormat fileFormat, HdfsUri location, + boolean ifNotExists) { + Preconditions.checkNotNull(tableName); + Preconditions.checkNotNull(srcTableName); + this.tableName_ = tableName; + this.srcTableName_ = srcTableName; + this.isExternal_ = isExternal; + this.comment_ = comment; + this.fileFormat_ = fileFormat; + this.location_ = location; + this.ifNotExists_ = ifNotExists; + } + + public String getTbl() { return tableName_.getTbl(); } + public String getSrcTbl() { return srcTableName_.getTbl(); } + public boolean isExternal() { return isExternal_; } + public boolean getIfNotExists() { return ifNotExists_; } + public String getComment() { return comment_; } + public THdfsFileFormat getFileFormat() { return fileFormat_; } + public HdfsUri getLocation() { return location_; } + + /** + * Can only be called after analysis, returns the name of the database the table will + * be created within. + */ + public String getDb() { + Preconditions.checkNotNull(dbName_); + return dbName_; + } + + /** + * Can only be called after analysis, returns the name of the database the table will + * be created within. + */ + public String getSrcDb() { + Preconditions.checkNotNull(srcDbName_); + return srcDbName_; + } + + public String getOwner() { + Preconditions.checkNotNull(owner_); + return owner_; + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder("CREATE "); + if (isExternal_) sb.append("EXTERNAL "); + sb.append("TABLE "); + if (ifNotExists_) sb.append("IF NOT EXISTS "); + if (tableName_.getDb() != null) sb.append(tableName_.getDb() + "."); + sb.append(tableName_.getTbl() + " LIKE "); + if (srcTableName_.getDb() != null) sb.append(srcTableName_.getDb() + "."); + sb.append(srcTableName_.getTbl()); + if (comment_ != null) sb.append(" COMMENT '" + comment_ + "'"); + if (fileFormat_ != null) sb.append(" STORED AS " + fileFormat_); + if (location_ != null) sb.append(" LOCATION '" + location_ + "'"); + return sb.toString(); + } + + public TCreateTableLikeParams toThrift() { + TCreateTableLikeParams params = new TCreateTableLikeParams(); + params.setTable_name(new TTableName(getDb(), getTbl())); + params.setSrc_table_name(new TTableName(getSrcDb(), getSrcTbl())); + params.setOwner(getOwner()); + params.setIs_external(isExternal()); + params.setComment(comment_); + if (fileFormat_ != null) params.setFile_format(fileFormat_); + params.setLocation(location_ == null ? null : location_.toString()); + params.setIf_not_exists(getIfNotExists()); + return params; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + Preconditions.checkState(tableName_ != null && !tableName_.isEmpty()); + Preconditions.checkState(srcTableName_ != null && !srcTableName_.isEmpty()); + // Make sure the source table exists and the user has permission to access it. + srcDbName_ = analyzer + .getTable(srcTableName_, Privilege.VIEW_METADATA) + .getDb().getName(); + tableName_.analyze(); + dbName_ = analyzer.getTargetDbName(tableName_); + owner_ = analyzer.getUser().getName(); + + if (analyzer.dbContainsTable(dbName_, tableName_.getTbl(), Privilege.CREATE) && + !ifNotExists_) { + throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + + String.format("%s.%s", dbName_, getTbl())); + } + analyzer.addAccessEvent(new TAccessEvent(dbName_ + "." + tableName_.getTbl(), + TCatalogObjectType.TABLE, Privilege.CREATE.toString())); + + if (location_ != null) { + location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java new file mode 100644 index 0000000..f7b683f --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java @@ -0,0 +1,416 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaParseException; +import org.apache.hadoop.fs.permission.FsAction; + +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.catalog.HdfsStorageDescriptor; +import com.cloudera.impala.catalog.KuduTable; +import com.cloudera.impala.catalog.RowFormat; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.common.FileSystemUtil; +import com.cloudera.impala.thrift.TAccessEvent; +import com.cloudera.impala.thrift.TCatalogObjectType; +import com.cloudera.impala.thrift.TCreateTableParams; +import com.cloudera.impala.thrift.THdfsFileFormat; +import com.cloudera.impala.thrift.TTableName; +import com.cloudera.impala.util.AvroSchemaConverter; +import com.cloudera.impala.util.AvroSchemaParser; +import com.cloudera.impala.util.AvroSchemaUtils; +import com.cloudera.impala.util.KuduUtil; +import com.cloudera.impala.util.MetaStoreUtil; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * Represents a CREATE TABLE statement. + */ +public class CreateTableStmt extends StatementBase { + private List<ColumnDef> columnDefs_; + private final String comment_; + private final boolean isExternal_; + private final boolean ifNotExists_; + private final THdfsFileFormat fileFormat_; + private final ArrayList<ColumnDef> partitionColDefs_; + private final RowFormat rowFormat_; + private TableName tableName_; + private final Map<String, String> tblProperties_; + private final Map<String, String> serdeProperties_; + private final HdfsCachingOp cachingOp_; + private HdfsUri location_; + private final List<DistributeParam> distributeParams_; + + // Set during analysis + private String owner_; + + /** + * Builds a CREATE TABLE statement + * @param tableName - Name of the new table + * @param columnDefs - List of column definitions for the table + * @param partitionColumnDefs - List of partition column definitions for the table + * @param isExternal - If true, the table's data will be preserved if dropped. + * @param comment - Comment to attach to the table + * @param rowFormat - Custom row format of the table. Use RowFormat.DEFAULT_ROW_FORMAT + * to specify default row format. + * @param fileFormat - File format of the table + * @param location - The HDFS location of where the table data will stored. + * @param cachingOp - The HDFS caching op that should be applied to this table. + * @param ifNotExists - If true, no errors are thrown if the table already exists. + * @param tblProperties - Optional map of key/values to persist with table metadata. + * @param serdeProperties - Optional map of key/values to persist with table serde + * metadata. + */ + public CreateTableStmt(TableName tableName, List<ColumnDef> columnDefs, + List<ColumnDef> partitionColumnDefs, boolean isExternal, String comment, + RowFormat rowFormat, THdfsFileFormat fileFormat, HdfsUri location, + HdfsCachingOp cachingOp, boolean ifNotExists, Map<String, String> tblProperties, + Map<String, String> serdeProperties, List<DistributeParam> distributeParams) { + Preconditions.checkNotNull(columnDefs); + Preconditions.checkNotNull(partitionColumnDefs); + Preconditions.checkNotNull(fileFormat); + Preconditions.checkNotNull(rowFormat); + Preconditions.checkNotNull(tableName); + + columnDefs_ = Lists.newArrayList(columnDefs); + comment_ = comment; + isExternal_ = isExternal; + ifNotExists_ = ifNotExists; + fileFormat_ = fileFormat; + location_ = location; + cachingOp_ = cachingOp; + partitionColDefs_ = Lists.newArrayList(partitionColumnDefs); + rowFormat_ = rowFormat; + tableName_ = tableName; + tblProperties_ = tblProperties; + serdeProperties_ = serdeProperties; + unescapeProperties(tblProperties_); + unescapeProperties(serdeProperties_); + distributeParams_ = distributeParams; + } + + /** + * Copy c'tor. + */ + public CreateTableStmt(CreateTableStmt other) { + columnDefs_ = Lists.newArrayList(other.columnDefs_); + comment_ = other.comment_; + isExternal_ = other.isExternal_; + ifNotExists_ = other.ifNotExists_; + fileFormat_ = other.fileFormat_; + location_ = other.location_; + cachingOp_ = other.cachingOp_; + partitionColDefs_ = Lists.newArrayList(other.partitionColDefs_); + rowFormat_ = other.rowFormat_; + tableName_ = other.tableName_; + tblProperties_ = other.tblProperties_; + serdeProperties_ = other.serdeProperties_; + distributeParams_ = other.distributeParams_; + } + + @Override + public CreateTableStmt clone() { return new CreateTableStmt(this); } + + public String getTbl() { return tableName_.getTbl(); } + public TableName getTblName() { return tableName_; } + public List<ColumnDef> getColumnDefs() { return columnDefs_; } + public List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; } + public String getComment() { return comment_; } + public boolean isExternal() { return isExternal_; } + public boolean getIfNotExists() { return ifNotExists_; } + public HdfsUri getLocation() { return location_; } + public void setLocation(HdfsUri location) { this.location_ = location; } + public THdfsFileFormat getFileFormat() { return fileFormat_; } + public RowFormat getRowFormat() { return rowFormat_; } + public Map<String, String> getTblProperties() { return tblProperties_; } + public Map<String, String> getSerdeProperties() { return serdeProperties_; } + + /** + * Can only be called after analysis, returns the owner of this table (the user from + * the current session). + */ + public String getOwner() { + Preconditions.checkNotNull(owner_); + return owner_; + } + + /** + * Can only be called after analysis, returns the name of the database the table will + * be created within. + */ + public String getDb() { + Preconditions.checkState(isAnalyzed()); + return tableName_.getDb(); + } + + @Override + public String toSql() { return ToSqlUtils.getCreateTableSql(this); } + + public TCreateTableParams toThrift() { + TCreateTableParams params = new TCreateTableParams(); + params.setTable_name(new TTableName(getDb(), getTbl())); + for (ColumnDef col: getColumnDefs()) { + params.addToColumns(col.toThrift()); + } + for (ColumnDef col: getPartitionColumnDefs()) { + params.addToPartition_columns(col.toThrift()); + } + params.setOwner(getOwner()); + params.setIs_external(isExternal()); + params.setComment(comment_); + params.setLocation(location_ == null ? null : location_.toString()); + if (cachingOp_ != null) params.setCache_op(cachingOp_.toThrift()); + params.setRow_format(rowFormat_.toThrift()); + params.setFile_format(fileFormat_); + params.setIf_not_exists(getIfNotExists()); + if (tblProperties_ != null) params.setTable_properties(tblProperties_); + if (serdeProperties_ != null) params.setSerde_properties(serdeProperties_); + if (distributeParams_ != null) { + for (DistributeParam d : distributeParams_) { + params.addToDistribute_by(d.toThrift()); + } + } + return params; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + super.analyze(analyzer); + Preconditions.checkState(tableName_ != null && !tableName_.isEmpty()); + tableName_ = analyzer.getFqTableName(tableName_); + tableName_.analyze(); + owner_ = analyzer.getUser().getName(); + + MetaStoreUtil.checkShortPropertyMap("Property", tblProperties_); + MetaStoreUtil.checkShortPropertyMap("Serde property", serdeProperties_); + + if (analyzer.dbContainsTable(tableName_.getDb(), tableName_.getTbl(), + Privilege.CREATE) && !ifNotExists_) { + throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + tableName_); + } + + analyzer.addAccessEvent(new TAccessEvent(tableName_.toString(), + TCatalogObjectType.TABLE, Privilege.CREATE.toString())); + + // Only Avro tables can have empty column defs because they can infer them from + // the Avro schema. + if (columnDefs_.isEmpty() && fileFormat_ != THdfsFileFormat.AVRO) { + throw new AnalysisException("Table requires at least 1 column"); + } + + if (location_ != null) { + location_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE); + } + + analyzeRowFormat(analyzer); + + // Check that all the column names are valid and unique. + analyzeColumnDefs(analyzer); + + if (getTblProperties() != null && KuduTable.KUDU_STORAGE_HANDLER.equals( + getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) { + analyzeKuduTable(analyzer); + } else if (distributeParams_ != null) { + throw new AnalysisException("Only Kudu tables can use DISTRIBUTE BY clause."); + } + + if (fileFormat_ == THdfsFileFormat.AVRO) { + columnDefs_ = analyzeAvroSchema(analyzer); + if (columnDefs_.isEmpty()) { + throw new AnalysisException( + "An Avro table requires column definitions or an Avro schema."); + } + AvroSchemaUtils.setFromSerdeComment(columnDefs_); + analyzeColumnDefs(analyzer); + } + + if (cachingOp_ != null) { + cachingOp_.analyze(analyzer); + if (cachingOp_.shouldCache() && location_ != null && + !FileSystemUtil.isPathCacheable(location_.getPath())) { + throw new AnalysisException(String.format("Location '%s' cannot be cached. " + + "Please retry without caching: CREATE TABLE %s ... UNCACHED", + location_.toString(), tableName_)); + } + } + + // Analyze 'skip.header.line.format' property. + if (tblProperties_ != null) { + AlterTableSetTblProperties.analyzeSkipHeaderLineCount(tblProperties_); + } + } + + private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException { + Byte fieldDelim = analyzeRowFormatValue(rowFormat_.getFieldDelimiter()); + Byte lineDelim = analyzeRowFormatValue(rowFormat_.getLineDelimiter()); + Byte escapeChar = analyzeRowFormatValue(rowFormat_.getEscapeChar()); + if (fileFormat_ == THdfsFileFormat.TEXT) { + if (fieldDelim == null) fieldDelim = HdfsStorageDescriptor.DEFAULT_FIELD_DELIM; + if (lineDelim == null) lineDelim = HdfsStorageDescriptor.DEFAULT_LINE_DELIM; + if (escapeChar == null) escapeChar = HdfsStorageDescriptor.DEFAULT_ESCAPE_CHAR; + if (fieldDelim != null && lineDelim != null && fieldDelim.equals(lineDelim)) { + throw new AnalysisException("Field delimiter and line delimiter have same " + + "value: byte " + fieldDelim); + } + if (fieldDelim != null && escapeChar != null && fieldDelim.equals(escapeChar)) { + analyzer.addWarning("Field delimiter and escape character have same value: " + + "byte " + fieldDelim + ". Escape character will be ignored"); + } + if (lineDelim != null && escapeChar != null && lineDelim.equals(escapeChar)) { + analyzer.addWarning("Line delimiter and escape character have same value: " + + "byte " + lineDelim + ". Escape character will be ignored"); + } + } + } + + /** + * Analyzes columnDefs_ and partitionColDefs_ checking whether all column + * names are unique. + */ + private void analyzeColumnDefs(Analyzer analyzer) throws AnalysisException { + Set<String> colNames = Sets.newHashSet(); + for (ColumnDef colDef: columnDefs_) { + colDef.analyze(); + if (!colNames.add(colDef.getColName().toLowerCase())) { + throw new AnalysisException("Duplicate column name: " + colDef.getColName()); + } + } + for (ColumnDef colDef: partitionColDefs_) { + colDef.analyze(); + if (!colDef.getType().supportsTablePartitioning()) { + throw new AnalysisException( + String.format("Type '%s' is not supported as partition-column type " + + "in column: %s", colDef.getType().toSql(), colDef.getColName())); + } + if (!colNames.add(colDef.getColName().toLowerCase())) { + throw new AnalysisException("Duplicate column name: " + colDef.getColName()); + } + } + } + + /** + * Analyzes the Avro schema and compares it with the columnDefs_ to detect + * inconsistencies. Returns a list of column descriptors that should be + * used for creating the table (possibly identical to columnDefs_). + */ + private List<ColumnDef> analyzeAvroSchema(Analyzer analyzer) + throws AnalysisException { + Preconditions.checkState(fileFormat_ == THdfsFileFormat.AVRO); + // Look for the schema in TBLPROPERTIES and in SERDEPROPERTIES, with latter + // taking precedence. + List<Map<String, String>> schemaSearchLocations = Lists.newArrayList(); + schemaSearchLocations.add(serdeProperties_); + schemaSearchLocations.add(tblProperties_); + String avroSchema = null; + List<ColumnDef> avroCols = null; // parsed from avroSchema + try { + avroSchema = AvroSchemaUtils.getAvroSchema(schemaSearchLocations); + if (avroSchema == null) { + // No Avro schema was explicitly set in the serde or table properties, so infer + // the Avro schema from the column definitions. + Schema inferredSchema = AvroSchemaConverter.convertColumnDefs( + columnDefs_, tableName_.toString()); + avroSchema = inferredSchema.toString(); + } + if (Strings.isNullOrEmpty(avroSchema)) { + throw new AnalysisException("Avro schema is null or empty: " + + tableName_.toString()); + } + avroCols = AvroSchemaParser.parse(avroSchema); + } catch (SchemaParseException e) { + throw new AnalysisException(String.format( + "Error parsing Avro schema for table '%s': %s", tableName_.toString(), + e.getMessage())); + } + Preconditions.checkNotNull(avroCols); + + // Analyze the Avro schema to detect inconsistencies with the columnDefs_. + // In case of inconsistencies, the column defs are ignored in favor of the Avro + // schema for simplicity and, in particular, to enable COMPUTE STATS (IMPALA-1104). + StringBuilder warning = new StringBuilder(); + List<ColumnDef> reconciledColDefs = + AvroSchemaUtils.reconcileSchemas(columnDefs_, avroCols, warning); + if (warning.length() > 0) analyzer.addWarning(warning.toString()); + return reconciledColDefs; + } + + private void analyzeKuduTable(Analyzer analyzer) throws AnalysisException { + // Validate that Kudu table is correctly specified. + if (!KuduTable.tableParamsAreValid(getTblProperties())) { + throw new AnalysisException("Kudu table is missing parameters " + + String.format("in table properties. Please verify if %s, %s, and %s are " + + "present and have valid values.", + KuduTable.KEY_TABLE_NAME, KuduTable.KEY_MASTER_ADDRESSES, + KuduTable.KEY_KEY_COLUMNS)); + } + + // Kudu table cannot be a cached table + if (cachingOp_ != null) { + throw new AnalysisException("A Kudu table cannot be cached in HDFS."); + } + + if (distributeParams_ != null) { + if (isExternal_) { + throw new AnalysisException( + "The DISTRIBUTE BY clause may not be specified for external tables."); + } + + List<String> keyColumns = KuduUtil.parseKeyColumnsAsList( + getTblProperties().get(KuduTable.KEY_KEY_COLUMNS)); + for (DistributeParam d : distributeParams_) { + // If the columns are not set, default to all key columns + if (d.getColumns() == null) d.setColumns(keyColumns); + d.analyze(analyzer); + } + } else if (!isExternal_) { + throw new AnalysisException( + "A data distribution must be specified using the DISTRIBUTE BY clause."); + } + } + + private Byte analyzeRowFormatValue(String value) throws AnalysisException { + if (value == null) return null; + Byte byteVal = HdfsStorageDescriptor.parseDelim(value); + if (byteVal == null) { + throw new AnalysisException("ESCAPED BY values and LINE/FIELD " + + "terminators must be specified as a single character or as a decimal " + + "value in the range [-128:127]: " + value); + } + return byteVal; + } + + /** + * Unescapes all values in the property map. + */ + public static void unescapeProperties(Map<String, String> propertyMap) { + if (propertyMap == null) return; + for (Map.Entry<String, String> kv : propertyMap.entrySet()) { + propertyMap.put(kv.getKey(), + new StringLiteral(kv.getValue()).getUnescapedValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java new file mode 100644 index 0000000..46b0003 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateUdaStmt.java @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.util.ArrayList; +import java.util.HashMap; + +import com.cloudera.impala.catalog.AggregateFunction; +import com.cloudera.impala.catalog.Function; +import com.cloudera.impala.catalog.PrimitiveType; +import com.cloudera.impala.catalog.Type; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.thrift.TFunctionBinaryType; +import com.cloudera.impala.thrift.TSymbolType; +import com.google.common.base.Preconditions; + +/** + * Represents a CREATE AGGREGATE FUNCTION statement. + */ +public class CreateUdaStmt extends CreateFunctionStmtBase { + private final TypeDef intermediateTypeDef_; + + /** + * Builds a CREATE AGGREGATE FUNCTION statement + * @param fnName - Name of the function + * @param fnArgs - List of types for the arguments to this function + * @param retType - The type this function returns. + * @param intermediateType_- The type used for the intermediate data. + * @param location - Path in HDFS containing the UDA. + * @param ifNotExists - If true, no errors are thrown if the function already exists + * @param additionalArgs - Key/Value pairs for additional arguments. The keys are + * validated in analyze() + */ + public CreateUdaStmt(FunctionName fnSymbol, FunctionArgs args, + TypeDef retTypeDef, TypeDef intermediateTypeDef, + HdfsUri location, boolean ifNotExists, + HashMap<CreateFunctionStmtBase.OptArg, String> optArgs) { + super(fnSymbol, args, retTypeDef, location, ifNotExists, optArgs); + intermediateTypeDef_ = intermediateTypeDef; + } + + private void reportCouldNotInferSymbol(String function) throws AnalysisException { + throw new AnalysisException("Could not infer symbol for " + + function + "() function."); + } + + // Gets the symbol for 'arg'. If the user set it from the dll, return that. Otherwise + // try to infer the Symbol from the Update function. To infer the Symbol, the update + // function must contain "update" or "Update" and we switch that out with 'defaultSymbol'. + // Returns null if no symbol was found. + private String getSymbolSymbol(OptArg arg, String defaultSymbol) { + // First lookup if the user explicitly set it. + if (optArgs_.get(arg) != null) return optArgs_.get(arg); + // Try to match it from Update + String updateFn = optArgs_.get(OptArg.UPDATE_FN); + // Mangled strings start with _Z. We can't get substitute Symbols for mangled + // strings. + // TODO: this is doable in the BE with more symbol parsing. + if (updateFn.startsWith("_Z")) return null; + + if (updateFn.contains("update")) return updateFn.replace("update", defaultSymbol); + if (updateFn.contains("Update")) { + char[] array = defaultSymbol.toCharArray(); + array[0] = Character.toUpperCase(array[0]); + String s = new String(array); + return updateFn.replace("Update", s); + } + return null; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + super.analyze(analyzer); + Preconditions.checkNotNull(fn_); + Preconditions.checkState(fn_ instanceof AggregateFunction); + AggregateFunction uda = (AggregateFunction) fn_; + + if (uda.getNumArgs() == 0) { + throw new AnalysisException("UDAs must take at least one argument."); + } + + if (uda.getBinaryType() == TFunctionBinaryType.JAVA) { + throw new AnalysisException("Java UDAs are not supported."); + } + + // TODO: these are temporarily restrictions since the BE cannot yet + // execute them. + if (uda.getBinaryType() == TFunctionBinaryType.IR) { + throw new AnalysisException("IR UDAs are not yet supported."); + } + if (fn_.hasVarArgs()) { + throw new AnalysisException("UDAs with varargs are not yet supported."); + } + if (fn_.getNumArgs() > 8) { + throw new AnalysisException( + "UDAs with more than 8 arguments are not yet supported."); + } + + if (uda.getReturnType().getPrimitiveType() == PrimitiveType.CHAR) { + throw new AnalysisException("UDAs with CHAR return type are not yet supported."); + } + if (uda.getReturnType().getPrimitiveType() == PrimitiveType.VARCHAR) { + throw new AnalysisException("UDAs with VARCHAR return type are not yet supported."); + } + for (int i = 0; i < uda.getNumArgs(); ++i) { + if (uda.getArgs()[i].getPrimitiveType() == PrimitiveType.CHAR) { + throw new AnalysisException("UDAs with CHAR arguments are not yet supported."); + } + if (uda.getArgs()[i].getPrimitiveType() == PrimitiveType.VARCHAR) { + throw new AnalysisException("UDAs with VARCHAR arguments are not yet supported."); + } + } + + Type intermediateType = null; + if (intermediateTypeDef_ == null) { + intermediateType = uda.getReturnType(); + } else { + intermediateTypeDef_.analyze(analyzer); + intermediateType = intermediateTypeDef_.getType(); + } + uda.setIntermediateType(intermediateType); + + // Check arguments that are only valid in UDFs are not set. + checkOptArgNotSet(OptArg.SYMBOL); + checkOptArgNotSet(OptArg.PREPARE_FN); + checkOptArgNotSet(OptArg.CLOSE_FN); + + // The user must provide the symbol for Update. + uda.setUpdateFnSymbol(uda.lookupSymbol( + checkAndGetOptArg(OptArg.UPDATE_FN), TSymbolType.UDF_EVALUATE, intermediateType, + uda.hasVarArgs(), uda.getArgs())); + + // If the ddl did not specify the init/serialize/merge/finalize function + // Symbols, guess them based on the update fn Symbol. + Preconditions.checkNotNull(uda.getUpdateFnSymbol()); + uda.setInitFnSymbol(getSymbolSymbol(OptArg.INIT_FN, "init")); + uda.setSerializeFnSymbol(getSymbolSymbol(OptArg.SERIALIZE_FN, "serialize")); + uda.setMergeFnSymbol(getSymbolSymbol(OptArg.MERGE_FN, "merge")); + uda.setFinalizeFnSymbol(getSymbolSymbol(OptArg.FINALIZE_FN, "finalize")); + + // Init and merge are required. + if (uda.getInitFnSymbol() == null) reportCouldNotInferSymbol("init"); + if (uda.getMergeFnSymbol() == null) reportCouldNotInferSymbol("merge"); + + // Validate that all set symbols exist. + uda.setInitFnSymbol(uda.lookupSymbol(uda.getInitFnSymbol(), + TSymbolType.UDF_EVALUATE, intermediateType, false)); + uda.setMergeFnSymbol(uda.lookupSymbol(uda.getMergeFnSymbol(), + TSymbolType.UDF_EVALUATE, intermediateType, false, intermediateType)); + if (uda.getSerializeFnSymbol() != null) { + try { + uda.setSerializeFnSymbol(uda.lookupSymbol(uda.getSerializeFnSymbol(), + TSymbolType.UDF_EVALUATE, null, false, intermediateType)); + } catch (AnalysisException e) { + if (optArgs_.get(OptArg.SERIALIZE_FN) != null) { + throw e; + } else { + // Ignore, these symbols are optional. + uda.setSerializeFnSymbol(null); + } + } + } + if (uda.getFinalizeFnSymbol() != null) { + try { + uda.setFinalizeFnSymbol(uda.lookupSymbol( + uda.getFinalizeFnSymbol(), TSymbolType.UDF_EVALUATE, null, false, + intermediateType)); + } catch (AnalysisException e) { + if (optArgs_.get(OptArg.FINALIZE_FN) != null) { + throw e; + } else { + // Ignore, these symbols are optional. + uda.setFinalizeFnSymbol(null); + } + } + } + + // If the intermediate type is not the return type, then finalize is + // required. + if (!intermediateType.equals(fn_.getReturnType()) && + uda.getFinalizeFnSymbol() == null) { + throw new AnalysisException("Finalize() is required for this UDA."); + } + + sqlString_ = uda.toSql(ifNotExists_); + } + + @Override + protected Function createFunction(FunctionName fnName, ArrayList<Type> argTypes, + Type retType, boolean hasVarArgs) { + return new AggregateFunction(fnName_, args_.getArgTypes(), retTypeDef_.getType(), + args_.hasVarArgs()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java new file mode 100644 index 0000000..550d26f --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateUdfStmt.java @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import com.cloudera.impala.catalog.Db; +import com.cloudera.impala.catalog.Function; +import com.cloudera.impala.catalog.PrimitiveType; +import com.cloudera.impala.catalog.ScalarFunction; +import com.cloudera.impala.catalog.ScalarType; +import com.cloudera.impala.catalog.Type; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.hive.executor.UdfExecutor.JavaUdfDataType; +import com.cloudera.impala.thrift.TFunctionBinaryType; +import com.cloudera.impala.thrift.TFunctionCategory; +import com.cloudera.impala.thrift.TSymbolType; +import com.google.common.base.Preconditions; + +/** + * Represents a CREATE FUNCTION statement. + */ +public class CreateUdfStmt extends CreateFunctionStmtBase { + /** + * Builds a CREATE FUNCTION statement + * @param fnName - Name of the function + * @param fnArgs - List of types for the arguments to this function + * @param retType - The type this function returns. + * @param location - Path in HDFS containing the UDA. + * @param ifNotExists - If true, no errors are thrown if the function already exists + * @param additionalArgs - Key/Value pairs for additional arguments. The keys are + * validated in analyze() + */ + public CreateUdfStmt(FunctionName fnName, FunctionArgs args, + TypeDef retTypeDef, HdfsUri location, boolean ifNotExists, + HashMap<CreateFunctionStmtBase.OptArg, String> optArgs) { + super(fnName, args, retTypeDef, location, ifNotExists, optArgs); + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + super.analyze(analyzer); + Preconditions.checkNotNull(fn_); + Preconditions.checkNotNull(fn_ instanceof ScalarFunction); + ScalarFunction udf = (ScalarFunction) fn_; + + if (hasSignature()) { + if (udf.getBinaryType() == TFunctionBinaryType.JAVA) { + if (!JavaUdfDataType.isSupported(udf.getReturnType())) { + throw new AnalysisException( + "Type " + udf.getReturnType().toSql() + " is not supported for Java UDFs."); + } + for (int i = 0; i < udf.getNumArgs(); ++i) { + if (!JavaUdfDataType.isSupported(udf.getArgs()[i])) { + throw new AnalysisException( + "Type " + udf.getArgs()[i].toSql() + " is not supported for Java UDFs."); + } + } + } + + if (udf.getReturnType().getPrimitiveType() == PrimitiveType.CHAR) { + throw new AnalysisException("UDFs that use CHAR are not yet supported."); + } + if (udf.getReturnType().getPrimitiveType() == PrimitiveType.VARCHAR) { + throw new AnalysisException("UDFs that use VARCHAR are not yet supported."); + } + for (int i = 0; i < udf.getNumArgs(); ++i) { + if (udf.getArgs()[i].getPrimitiveType() == PrimitiveType.CHAR) { + throw new AnalysisException("UDFs that use CHAR are not yet supported."); + } + if (udf.getArgs()[i].getPrimitiveType() == PrimitiveType.VARCHAR) { + throw new AnalysisException("UDFs that use VARCHAR are not yet supported."); + } + } + } + + // Check the user provided symbol exists + udf.setSymbolName(udf.lookupSymbol( + checkAndGetOptArg(OptArg.SYMBOL), TSymbolType.UDF_EVALUATE, null, + udf.hasVarArgs(), udf.getArgs())); + + // Set optional Prepare/Close functions + String prepareFn = optArgs_.get(OptArg.PREPARE_FN); + if (prepareFn != null) { + udf.setPrepareFnSymbol(udf.lookupSymbol(prepareFn, TSymbolType.UDF_PREPARE)); + } + String closeFn = optArgs_.get(OptArg.CLOSE_FN); + if (closeFn != null) { + udf.setCloseFnSymbol(udf.lookupSymbol(closeFn, TSymbolType.UDF_CLOSE)); + } + + // Udfs should not set any of these + checkOptArgNotSet(OptArg.UPDATE_FN); + checkOptArgNotSet(OptArg.INIT_FN); + checkOptArgNotSet(OptArg.SERIALIZE_FN); + checkOptArgNotSet(OptArg.MERGE_FN); + checkOptArgNotSet(OptArg.FINALIZE_FN); + + sqlString_ = udf.toSql(ifNotExists_); + + // Check that there is no function with the same name and isPersistent field not + // the same as udf.isPersistent_. For example we don't allow two JAVA udfs with + // same name and opposite persistence values set. This only applies for JAVA udfs + // as all the native udfs are persisted. Additionally we don't throw exceptions + // if "IF NOT EXISTS" is specified in the query. + if (udf.getBinaryType() != TFunctionBinaryType.JAVA || ifNotExists_) return; + + Preconditions.checkNotNull(db_); + for (Function fn: db_.getFunctions(udf.functionName())) { + if (!hasSignature() || (hasSignature() && fn.isPersistent())) { + throw new AnalysisException( + String.format(Analyzer.FN_ALREADY_EXISTS_ERROR_MSG + + fn.signatureString())); + } + } + } + + @Override + protected Function createFunction(FunctionName fnName, ArrayList<Type> argTypes, Type retType, + boolean hasVarArgs) { + return new ScalarFunction(fnName, argTypes, retType, hasVarArgs); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java new file mode 100644 index 0000000..c38eef0 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.util.ArrayList; + +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.common.RuntimeEnv; +import com.cloudera.impala.thrift.TAccessEvent; +import com.cloudera.impala.thrift.TCatalogObjectType; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; + +/** + * Represents a CREATE VIEW statement. + */ +public class CreateViewStmt extends CreateOrAlterViewStmtBase { + + public CreateViewStmt(boolean ifNotExists, TableName tableName, + ArrayList<ColumnDef> columnDefs, String comment, QueryStmt viewDefStmt) { + super(ifNotExists, tableName, columnDefs, comment, viewDefStmt); + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + Preconditions.checkState(tableName_ != null && !tableName_.isEmpty()); + + tableName_.analyze(); + // Use a child analyzer to let views have complex-typed columns. + Analyzer viewAnalyzerr = new Analyzer(analyzer); + // Enforce Hive column labels for view compatibility. + viewAnalyzerr.setUseHiveColLabels(true); + viewDefStmt_.analyze(viewAnalyzerr); + + dbName_ = analyzer.getTargetDbName(tableName_); + owner_ = analyzer.getUser().getName(); + if (analyzer.dbContainsTable(dbName_, tableName_.getTbl(), Privilege.CREATE) && + !ifNotExists_) { + throw new AnalysisException(Analyzer.TBL_ALREADY_EXISTS_ERROR_MSG + + String.format("%s.%s", dbName_, tableName_.getTbl())); + } + analyzer.addAccessEvent(new TAccessEvent(dbName_ + "." + tableName_.getTbl(), + TCatalogObjectType.VIEW, Privilege.CREATE.toString())); + + createColumnAndViewDefs(analyzer); + if (RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) { + computeLineageGraph(analyzer); + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("CREATE VIEW "); + if (ifNotExists_) sb.append("IF NOT EXISTS "); + if (tableName_.getDb() != null) sb.append(tableName_.getDb() + "."); + sb.append(tableName_.getTbl() + " ("); + sb.append(Joiner.on(", ").join(columnDefs_)); + sb.append(") AS "); + sb.append(viewDefStmt_.toSql()); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java new file mode 100644 index 0000000..efa2117 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import java.util.List; + +import com.cloudera.impala.common.Pair; +import com.cloudera.impala.planner.DataSink; +import com.cloudera.impala.planner.KuduTableSink; +import com.cloudera.impala.planner.TableSink; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.hadoop.hbase.client.Delete; + +/** + * Representation of a DELETE statement. + * + * A delete statement contains three main parts, the target table reference, the from + * clause and the optional where clause. Syntactically, this is represented as follows: + * + * DELETE [FROM] dotted_path [WHERE expr] + * DELETE [table_alias] FROM table_ref_list [WHERE expr] + * + * Only the syntax using the explicit from clause can contain join conditions. + */ +public class DeleteStmt extends ModifyStmt { + + public DeleteStmt(List<String> targetTablePath, FromClause tableRefs, + Expr wherePredicate, boolean ignoreNotFound) { + super(targetTablePath, tableRefs, Lists.<Pair<SlotRef, Expr>>newArrayList(), + wherePredicate, ignoreNotFound); + } + + public DeleteStmt(DeleteStmt other) { + super(other.targetTablePath_, other.fromClause_.clone(), + Lists.<Pair<SlotRef, Expr>>newArrayList(), other.wherePredicate_.clone(), + other.ignoreNotFound_); + } + + public DataSink createDataSink() { + // analyze() must have been called before. + Preconditions.checkState(table_ != null); + TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE, + ImmutableList.<Expr>of(), referencedColumns_, false, ignoreNotFound_); + Preconditions.checkState(!referencedColumns_.isEmpty()); + return tableSink; + } + + @Override + public DeleteStmt clone() { + return new DeleteStmt(this); + } + + @Override + public String toSql() { + StringBuilder b = new StringBuilder(); + b.append("DELETE"); + if (ignoreNotFound_) b.append(" IGNORE"); + if (fromClause_.size() > 1 || targetTableRef_.hasExplicitAlias()) { + b.append(" "); + if (targetTableRef_.hasExplicitAlias()) { + b.append(targetTableRef_.getExplicitAlias()); + } else { + b.append(targetTableRef_.toSql()); + } + } + b.append(fromClause_.toSql()); + if (wherePredicate_ != null) { + b.append(" WHERE "); + b.append(wherePredicate_.toSql()); + } + return b.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java b/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java new file mode 100644 index 0000000..0ddd6ec --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/DescribeDbStmt.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloudera.impala.analysis; + +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.thrift.TDescribeDbParams; +import com.cloudera.impala.thrift.TDescribeOutputStyle; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +/** + * Represents a DESCRIBE DATABASE statement which returns metadata on + * a specified database: + * Syntax: DESCRIBE DATABASE [FORMATTED|EXTENDED] <db> + * + * If FORMATTED|EXTENDED is not specified, the statement only returns the given + * database's location and comment. + * If FORMATTED|EXTENDED is specified, extended metadata on the database is returned. + * This metadata includes info about the database's parameters, owner info + * and privileges. + */ +public class DescribeDbStmt extends StatementBase { + private final TDescribeOutputStyle outputStyle_; + private final String dbName_; + + public DescribeDbStmt(String dbName, TDescribeOutputStyle outputStyle) { + Preconditions.checkState(!Strings.isNullOrEmpty(dbName), "Invalid database name"); + dbName_ = dbName; + outputStyle_ = outputStyle; + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder("DESCRIBE DATABASE "); + if (outputStyle_ != TDescribeOutputStyle.MINIMAL) { + sb.append(outputStyle_.toString() + " "); + } + return sb.toString() + dbName_; + } + + public String getDb() { return dbName_; } + public TDescribeOutputStyle getOutputStyle() { return outputStyle_; } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + analyzer.getDb(dbName_, Privilege.VIEW_METADATA); + } + + public TDescribeDbParams toThrift() { + TDescribeDbParams params = new TDescribeDbParams(); + params.setDb(dbName_); + params.setOutput_style(outputStyle_); + return params; + } +}
