DRILL-4956: Temporary tables support close apache/drill#666
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bb29f19f Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bb29f19f Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bb29f19f Branch: refs/heads/master Commit: bb29f19ff8807fd07cdaa9e7110c1a003b3da15b Parents: 8a4d7a9 Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> Authored: Thu Nov 3 16:55:38 2016 +0000 Committer: Jinfeng Ni <j...@apache.org> Committed: Mon Jan 23 17:08:06 2017 -0800 ---------------------------------------------------------------------- .../src/resources/drill-override-example.conf | 8 +- .../src/main/codegen/includes/parserImpls.ftl | 9 +- .../org/apache/drill/exec/ExecConstants.java | 7 +- .../exec/physical/base/AbstractWriter.java | 17 +- .../exec/physical/impl/WriterRecordBatch.java | 22 +- .../logical/FileSystemCreateTableEntry.java | 18 +- .../drill/exec/planner/sql/DrillSqlWorker.java | 9 +- .../drill/exec/planner/sql/SchemaUtilites.java | 42 +- .../drill/exec/planner/sql/SqlConverter.java | 141 +++++-- .../sql/handlers/CreateTableHandler.java | 111 ++++- .../planner/sql/handlers/DropTableHandler.java | 65 ++- .../planner/sql/handlers/SqlHandlerUtil.java | 49 ++- .../exec/planner/sql/handlers/ViewHandler.java | 63 +-- .../sql/parser/CompoundIdentifierConverter.java | 4 +- .../exec/planner/sql/parser/SqlCreateTable.java | 28 +- .../apache/drill/exec/rpc/user/UserServer.java | 13 +- .../apache/drill/exec/rpc/user/UserSession.java | 170 +++++++- .../org/apache/drill/exec/server/Drillbit.java | 23 +- .../apache/drill/exec/store/AbstractSchema.java | 21 +- .../drill/exec/store/SchemaTreeProvider.java | 32 +- .../drill/exec/store/StorageStrategy.java | 194 +++++++++ .../drill/exec/store/SubSchemaWrapper.java | 6 +- .../drill/exec/store/dfs/FileSelection.java | 5 +- .../exec/store/dfs/FileSystemSchemaFactory.java | 10 +- .../exec/store/dfs/WorkspaceSchemaFactory.java | 10 +- .../drill/exec/store/dfs/easy/EasyWriter.java | 10 +- .../exec/store/easy/json/JSONFormatPlugin.java | 4 +- .../exec/store/easy/json/JsonRecordWriter.java | 24 +- .../exec/store/easy/text/TextFormatPlugin.java | 4 +- .../exec/store/parquet/ParquetRecordWriter.java | 64 ++- .../drill/exec/store/parquet/ParquetWriter.java | 10 +- .../exec/store/text/DrillTextRecordWriter.java | 31 +- .../src/main/resources/drill-module.conf | 8 +- .../java/org/apache/drill/BaseTestQuery.java | 3 +- .../java/org/apache/drill/TestDropTable.java | 10 +- .../user/TemporaryTablesAutomaticDropTest.java | 95 +++++ .../drill/exec/sql/TestBaseViewSupport.java | 4 +- .../org/apache/drill/exec/sql/TestCTTAS.java | 422 +++++++++++++++++++ .../drill/exec/store/StorageStrategyTest.java | 222 ++++++++++ .../resources/bootstrap-storage-plugins.json | 2 +- .../apache/drill/exec/rpc/RemoteConnection.java | 9 +- .../java/org/apache/drill/exec/rpc/RpcBus.java | 8 +- 42 files changed, 1797 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/distribution/src/resources/drill-override-example.conf ---------------------------------------------------------------------- diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf index f9d27b3..3baac5e 100644 --- a/distribution/src/resources/drill-override-example.conf +++ b/distribution/src/resources/drill-override-example.conf @@ -184,7 +184,13 @@ drill.exec: { # Set this property if custom absolute root should be used for remote directories root: "/app/drill" } - } + }, + # Settings for Temporary Tables. + # See https://gist.github.com/arina-ielchiieva/50158175867a18eee964b5ba36455fbf#file-temporarytablessupport-md. + # Temporary table can be created ONLY in default temporary workspace. + # Full workspace name should be indicated (including schema and workspace separated by dot). + # Workspace MUST be file-based and writable. Workspace name is case-sensitive. + default_temporary_workspace: "dfs.tmp" } # Below SSL parameters need to be set for custom transport layer settings. http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/codegen/includes/parserImpls.ftl ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl index 0017446..d9ceed9 100644 --- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl +++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl @@ -214,8 +214,8 @@ SqlNode SqlDropView() : } /** - * Parses a CTAS statement. - * CREATE TABLE tblname [ (field1, field2, ...) ] AS select_statement. + * Parses a CTAS or CTTAS statement. + * CREATE [TEMPORARY] TABLE tblname [ (field1, field2, ...) ] AS select_statement. */ SqlNode SqlCreateTable() : { @@ -224,12 +224,14 @@ SqlNode SqlCreateTable() : SqlNodeList fieldList; SqlNodeList partitionFieldList; SqlNode query; + boolean isTemporary = false; } { { partitionFieldList = SqlNodeList.EMPTY; } <CREATE> { pos = getPos(); } + ( <TEMPORARY> { isTemporary = true; } )? <TABLE> tblName = CompoundIdentifier() fieldList = ParseOptionalFieldList("Table") @@ -239,7 +241,8 @@ SqlNode SqlCreateTable() : <AS> query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) { - return new SqlCreateTable(pos, tblName, fieldList, partitionFieldList, query); + return new SqlCreateTable(pos,tblName, fieldList, partitionFieldList, query, + SqlLiteral.createBoolean(isTemporary, getPos())); } } http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 740eb4b..e8cc75c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -128,6 +128,11 @@ public interface ExecConstants { */ String DRILL_TMP_DIR = "drill.tmp-dir"; + /** + * Temporary tables can be created ONLY in default temporary workspace. + */ + String DEFAULT_TEMPORARY_WORKSPACE = "drill.exec.default_temporary_workspace"; + String OUTPUT_FORMAT_OPTION = "store.format"; OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet"); String PARQUET_BLOCK_SIZE = "store.parquet.block-size"; http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java index af23d5f..6ba570b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,7 +17,12 @@ */ package org.apache.drill.exec.physical.base; -public abstract class AbstractWriter extends AbstractSingle implements Writer{ +import org.apache.drill.exec.store.StorageStrategy; + +public abstract class AbstractWriter extends AbstractSingle implements Writer { + + /** Storage strategy is used during table folder and files creation*/ + private StorageStrategy storageStrategy; public AbstractWriter(PhysicalOperator child) { super(child); @@ -27,4 +32,12 @@ public abstract class AbstractWriter extends AbstractSingle implements Writer{ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { return physicalVisitor.visitWriter(this, value); } + + public void setStorageStrategy(StorageStrategy storageStrategy) { + this.storageStrategy = storageStrategy; + } + + public StorageStrategy getStorageStrategy() { + return storageStrategy; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index e6c946c..939832b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -174,13 +174,25 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { schema = container.getSchema(); } + /** Clean up needs to be performed before closing writer. Partially written data will be removed. */ private void closeWriter() { - if (recordWriter != null) { + if (recordWriter == null) { + return; + } + + try { + recordWriter.cleanup(); + } catch(IOException ex) { + context.fail(ex); + } finally { try { - recordWriter.cleanup(); + if (!processed) { + recordWriter.abort(); + } + } catch (IOException e) { + logger.error("Abort failed. There could be leftover output files.", e); + } finally { recordWriter = null; - } catch(IOException ex) { - context.fail(ex); } } } http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java index 90eb05c..23ea23f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,9 +22,10 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.exec.physical.base.AbstractWriter; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.store.StorageStrategy; import org.apache.drill.exec.physical.base.Writer; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.dfs.FileSystemConfig; @@ -34,7 +35,6 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.drill.exec.store.ischema.Records; /** * Implements <code>CreateTableEntry</code> interface to create new tables in FileSystem storage. @@ -47,28 +47,33 @@ public class FileSystemCreateTableEntry implements CreateTableEntry { private FormatPlugin formatPlugin; private String location; private final List<String> partitionColumns; + private final StorageStrategy storageStrategy; @JsonCreator public FileSystemCreateTableEntry(@JsonProperty("storageConfig") FileSystemConfig storageConfig, @JsonProperty("formatConfig") FormatPluginConfig formatConfig, @JsonProperty("location") String location, @JsonProperty("partitionColumn") List<String> partitionColumns, + @JsonProperty("storageStrategy") StorageStrategy storageStrategy, @JacksonInject StoragePluginRegistry engineRegistry) throws ExecutionSetupException { this.storageConfig = storageConfig; this.formatPlugin = engineRegistry.getFormatPlugin(storageConfig, formatConfig); this.location = location; this.partitionColumns = partitionColumns; + this.storageStrategy = storageStrategy; } public FileSystemCreateTableEntry(FileSystemConfig storageConfig, FormatPlugin formatPlugin, String location, - List<String> partitionColumns) { + List<String> partitionColumns, + StorageStrategy storageStrategy) { this.storageConfig = storageConfig; this.formatPlugin = formatPlugin; this.location = location; this.partitionColumns = partitionColumns; + this.storageStrategy = storageStrategy; } @JsonProperty("storageConfig") @@ -89,11 +94,14 @@ public class FileSystemCreateTableEntry implements CreateTableEntry { formatPlugin.getName())).build(logger); } - return formatPlugin.getWriter(child, location, partitionColumns); + AbstractWriter writer = formatPlugin.getWriter(child, location, partitionColumns); + writer.setStorageStrategy(storageStrategy); + return writer; } @Override public List<String> getPartitionColumns() { return partitionColumns; } + } http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index 76529d4..0ad3944 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -58,12 +58,7 @@ public class DrillSqlWorker { public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan) throws ForemanSetupException { - final SqlConverter parser = new SqlConverter( - context.getPlannerSettings(), - context.getNewDefaultSchema(), - context.getDrillOperatorTable(), - (UdfUtilities) context, - context.getFunctionRegistry()); + final SqlConverter parser = new SqlConverter(context); injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class); final SqlNode sqlNode = parser.parse(sql); http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java index 085f808..20c92c7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,8 +22,12 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.ValidationException; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.store.AbstractSchema; +import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory; import java.util.Collections; import java.util.List; @@ -116,6 +120,11 @@ public class SchemaUtilites { return SCHEMA_PATH_JOINER.join(getSchemaPathAsList(schema)); } + /** Utility method to get the schema path for given list of schema path. */ + public static String getSchemaPath(List<String> schemaPath) { + return SCHEMA_PATH_JOINER.join(schemaPath); + } + /** Utility method to get the schema path as list for given schema instance. */ public static List<String> getSchemaPathAsList(SchemaPlus schema) { if (isRootSchema(schema)) { @@ -177,4 +186,35 @@ public class SchemaUtilites { return drillSchema; } + + /** + * Looks in schema tree for default temporary workspace instance. + * Makes sure that temporary workspace is mutable and file-based + * (instance of {@link WorkspaceSchemaFactory.WorkspaceSchema}). + * + * @param defaultSchema default schema + * @param config drill config + * @return default temporary workspace + */ + public static AbstractSchema getTemporaryWorkspace(SchemaPlus defaultSchema, DrillConfig config) { + List<String> temporarySchemaPath = Lists.newArrayList(config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE)); + AbstractSchema temporarySchema = resolveToMutableDrillSchema(defaultSchema, temporarySchemaPath); + if (!(temporarySchema instanceof WorkspaceSchemaFactory.WorkspaceSchema)) { + DrillRuntimeException.format("Temporary workspace [%s] must be file-based, instance of " + + "WorkspaceSchemaFactory.WorkspaceSchema", temporarySchemaPath); + } + return temporarySchema; + } + + /** + * Checks that passed schema path is the same as temporary workspace path. + * Check is case-sensitive. + * + * @param schemaPath schema path + * @param config drill config + * @return true is schema path corresponds to temporary workspace, false otherwise + */ + public static boolean isTemporaryWorkspace(String schemaPath, DrillConfig config) { + return schemaPath.equals(config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE)); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java index 0c3c6a0..28196c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,10 +21,12 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.avatica.util.Casing; import org.apache.calcite.avatica.util.Quoting; +import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.jdbc.CalciteSchemaImpl; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.plan.ConventionTraitDef; @@ -33,6 +35,7 @@ import org.apache.calcite.plan.RelOptCostFactory; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.volcano.VolcanoPlanner; import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.prepare.RelOptTableImpl; import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; @@ -41,17 +44,14 @@ import org.apache.calcite.rel.type.RelDataTypeSystemImpl; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlOperatorTable; -import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.sql.parser.SqlParserImplFactory; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.util.ChainedSqlOperatorTable; -import org.apache.calcite.sql.validate.AggregatingSelectScope; import org.apache.calcite.sql.validate.SqlConformance; import org.apache.calcite.sql.validate.SqlValidatorCatalogReader; import org.apache.calcite.sql.validate.SqlValidatorException; @@ -61,18 +61,21 @@ import org.apache.calcite.sql2rel.RelDecorrelator; import org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.exception.FunctionNotFoundException; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.planner.cost.DrillCostBase; import org.apache.drill.exec.planner.logical.DrillConstExecutor; import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef; import org.apache.drill.exec.planner.physical.PlannerSettings; -import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.sql.parser.impl.DrillParserWithCompoundIdConverter; import com.google.common.base.Joiner; +import org.apache.drill.exec.rpc.user.UserSession; /** * Class responsible for managing parsing, validation and toRel conversion for sql statements. @@ -86,7 +89,7 @@ public class SqlConverter { private final SqlParser.Config parserConfig; // Allow the default config to be modified using immutable configs private SqlToRelConverter.Config sqlToRelConverterConfig; - private final CalciteCatalogReader catalog; + private final DrillCalciteCatalogReader catalog; private final PlannerSettings settings; private final SchemaPlus rootSchema; private final SchemaPlus defaultSchema; @@ -96,35 +99,42 @@ public class SqlConverter { private final boolean isInnerQuery; private final UdfUtilities util; private final FunctionImplementationRegistry functions; + private final String temporarySchema; + private final UserSession session; + private final DrillConfig drillConfig; private String sql; private VolcanoPlanner planner; - public SqlConverter(PlannerSettings settings, SchemaPlus defaultSchema, - final SqlOperatorTable operatorTable, UdfUtilities util, FunctionImplementationRegistry functions) { - this.settings = settings; - this.util = util; - this.functions = functions; + public SqlConverter(QueryContext context) { + this.settings = context.getPlannerSettings(); + this.util = (UdfUtilities) context; + this.functions = context.getFunctionRegistry(); this.parserConfig = new ParserConfig(); this.sqlToRelConverterConfig = new SqlToRelConverterConfig(); this.isInnerQuery = false; this.typeFactory = new JavaTypeFactoryImpl(DRILL_TYPE_SYSTEM); - this.defaultSchema = defaultSchema; + this.defaultSchema = context.getNewDefaultSchema(); this.rootSchema = rootSchema(defaultSchema); - this.catalog = new CalciteCatalogReader( + this.temporarySchema = context.getConfig().getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE); + this.session = context.getSession(); + this.drillConfig = context.getConfig(); + this.catalog = new DrillCalciteCatalogReader( CalciteSchemaImpl.from(rootSchema), parserConfig.caseSensitive(), CalciteSchemaImpl.from(defaultSchema).path(null), - typeFactory); - this.opTab = new ChainedSqlOperatorTable(Arrays.asList(operatorTable, catalog)); + typeFactory, + drillConfig, + session); + this.opTab = new ChainedSqlOperatorTable(Arrays.asList(context.getDrillOperatorTable(), catalog)); this.costFactory = (settings.useDefaultCosting()) ? null : new DrillCostBase.DrillCostFactory(); this.validator = new DrillValidator(opTab, catalog, typeFactory, SqlConformance.DEFAULT); validator.setIdentifierExpansion(true); } private SqlConverter(SqlConverter parent, SchemaPlus defaultSchema, SchemaPlus rootSchema, - CalciteCatalogReader catalog) { + DrillCalciteCatalogReader catalog) { this.parserConfig = parent.parserConfig; this.sqlToRelConverterConfig = parent.sqlToRelConverterConfig; this.defaultSchema = defaultSchema; @@ -139,6 +149,9 @@ public class SqlConverter { this.opTab = parent.opTab; this.planner = parent.planner; this.validator = new DrillValidator(opTab, catalog, typeFactory, SqlConformance.DEFAULT); + this.temporarySchema = parent.temporarySchema; + this.session = parent.session; + this.drillConfig = parent.drillConfig; validator.setIdentifierExpansion(true); } @@ -203,6 +216,11 @@ public class SqlConverter { return defaultSchema; } + /** Disallow temporary tables presence in sql statement (ex: in view definitions) */ + public void disallowTemporaryTables() { + catalog.disallowTemporaryTables(); + } + private class DrillValidator extends SqlValidatorImpl { private final Set<SqlValidatorScope> identitySet = Sets.newIdentityHashSet(); @@ -272,26 +290,27 @@ public class SqlConverter { public Expander() { } - public RelNode expandView( - RelDataType rowType, - String queryString, - List<String> schemaPath) { - SqlConverter parser = new SqlConverter(SqlConverter.this, defaultSchema, rootSchema, - catalog.withSchemaPath(schemaPath)); + public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) { + final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader( + CalciteSchemaImpl.from(rootSchema), + parserConfig.caseSensitive(), + schemaPath, + typeFactory, + drillConfig, + session); + final SqlConverter parser = new SqlConverter(SqlConverter.this, defaultSchema, rootSchema, catalogReader); return expandView(queryString, parser); } @Override - public RelNode expandView( - RelDataType rowType, - String queryString, - SchemaPlus rootSchema, // new root schema - List<String> schemaPath) { - final CalciteCatalogReader catalogReader = new CalciteCatalogReader( - CalciteSchemaImpl.from(rootSchema), + public RelNode expandView(RelDataType rowType, String queryString, SchemaPlus rootSchema, List<String> schemaPath) { + final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader( + CalciteSchemaImpl.from(rootSchema), // new root schema parserConfig.caseSensitive(), schemaPath, - typeFactory); + typeFactory, + drillConfig, + session); SchemaPlus schema = rootSchema; for (String s : schemaPath) { SchemaPlus newSchema = schema.getSubSchema(s); @@ -447,4 +466,66 @@ public class SqlConverter { return node; } } + + /** + * Extension of {@link CalciteCatalogReader} to add ability to check for temporary tables first + * if schema is not indicated near table name during query parsing + * or indicated workspace is default temporary workspace. + */ + private class DrillCalciteCatalogReader extends CalciteCatalogReader { + + private final DrillConfig drillConfig; + private final UserSession session; + private boolean allowTemporaryTables; + + DrillCalciteCatalogReader(CalciteSchema rootSchema, + boolean caseSensitive, + List<String> defaultSchema, + JavaTypeFactory typeFactory, + DrillConfig drillConfig, + UserSession session) { + super(rootSchema, caseSensitive, defaultSchema, typeFactory); + this.drillConfig = drillConfig; + this.session = session; + this.allowTemporaryTables = true; + } + + /** Disallow temporary tables presence in sql statement (ex: in view definitions) */ + public void disallowTemporaryTables() { + this.allowTemporaryTables = false; + } + + /** + * If schema is not indicated (only one element in the list) or schema is default temporary workspace, + * we need to check among session temporary tables first in default temporary workspace. + * If temporary table is found and temporary tables usage is allowed, its table instance will be returned, + * otherwise search will be conducted in original workspace. + * + * @param names list of schema and table names, table name is always the last element + * @return table instance, null otherwise + * @throws UserException if temporary tables usage is disallowed + */ + @Override + public RelOptTableImpl getTable(final List<String> names) { + RelOptTableImpl temporaryTable = null; + String schemaPath = SchemaUtilites.getSchemaPath(names.subList(0, names.size() - 1)); + if (names.size() == 1 || SchemaUtilites.isTemporaryWorkspace(schemaPath, drillConfig)) { + String temporaryTableName = session.resolveTemporaryTableName(names.get(names.size() - 1)); + if (temporaryTableName != null) { + List<String> temporaryNames = Lists.newArrayList(temporarySchema, temporaryTableName); + temporaryTable = super.getTable(temporaryNames); + } + } + if (temporaryTable != null) { + if (allowTemporaryTables) { + return temporaryTable; + } + throw UserException + .validationError() + .message("Temporary tables usage is disallowed. Used temporary table name: %s.", names) + .build(logger); + } + return super.getTable(names); + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java index b6ffde6..12c72c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -31,13 +31,17 @@ import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.rpc.user.UserSession; +import org.apache.drill.exec.store.StorageStrategy; import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillScreenRel; import org.apache.drill.exec.planner.logical.DrillWriterRel; @@ -67,43 +71,54 @@ public class CreateTableHandler extends DefaultSqlHandler { @Override public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException { SqlCreateTable sqlCreateTable = unwrap(sqlNode, SqlCreateTable.class); - final String newTblName = sqlCreateTable.getName(); + String originalTableName = sqlCreateTable.getName(); final ConvertedRelNode convertedRelNode = validateAndConvert(sqlCreateTable.getQuery()); final RelDataType validatedRowType = convertedRelNode.getValidatedRowType(); final RelNode queryRelNode = convertedRelNode.getConvertedNode(); - final RelNode newTblRelNode = SqlHandlerUtil.resolveNewTableRel(false, sqlCreateTable.getFieldNames(), validatedRowType, queryRelNode); - final AbstractSchema drillSchema = - SchemaUtilites.resolveToMutableDrillSchema(config.getConverter().getDefaultSchema(), - sqlCreateTable.getSchemaPath()); - final String schemaPath = drillSchema.getFullSchemaName(); + final DrillConfig drillConfig = context.getConfig(); + final AbstractSchema drillSchema = resolveSchema(sqlCreateTable, config.getConverter().getDefaultSchema(), drillConfig); - if (SqlHandlerUtil.getTableFromSchema(drillSchema, newTblName) != null) { - throw UserException.validationError() - .message("A table or view with given name [%s] already exists in schema [%s]", newTblName, schemaPath) - .build(logger); - } + checkDuplicatedObjectExistence(drillSchema, originalTableName, drillConfig, context.getSession()); - final RelNode newTblRelNodeWithPCol = SqlHandlerUtil.qualifyPartitionCol(newTblRelNode, sqlCreateTable.getPartitionColumns()); + final RelNode newTblRelNodeWithPCol = SqlHandlerUtil.qualifyPartitionCol(newTblRelNode, + sqlCreateTable.getPartitionColumns()); log("Calcite", newTblRelNodeWithPCol, logger, null); - // Convert the query to Drill Logical plan and insert a writer operator on top. - DrillRel drel = convertToDrel(newTblRelNodeWithPCol, drillSchema, newTblName, sqlCreateTable.getPartitionColumns(), newTblRelNode.getRowType()); + StorageStrategy storageStrategy = sqlCreateTable.isTemporary() ? + StorageStrategy.TEMPORARY : StorageStrategy.PERSISTENT; + + // If we are creating temporary table, initial table name will be replaced with generated table name. + // Generated table name is unique, UUID.randomUUID() is used for its generation. + // Original table name is stored in temporary tables cache, so it can be substituted to generated one during querying. + String newTableName = sqlCreateTable.isTemporary() ? + context.getSession().registerTemporaryTable(drillSchema, originalTableName) : originalTableName; + + DrillRel drel = convertToDrel(newTblRelNodeWithPCol, drillSchema, newTableName, + sqlCreateTable.getPartitionColumns(), newTblRelNode.getRowType(), storageStrategy); Prel prel = convertToPrel(drel, newTblRelNode.getRowType(), sqlCreateTable.getPartitionColumns()); logAndSetTextPlan("Drill Physical", prel, logger); PhysicalOperator pop = convertToPop(prel); PhysicalPlan plan = convertToPlan(pop); log("Drill Plan", plan, logger); + String message = String.format("Creating %s table [%s].", + sqlCreateTable.isTemporary() ? "temporary" : "persistent", originalTableName); + logger.info(message); return plan; } - private DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String tableName, List<String> partitionColumns, RelDataType queryRowType) + private DrillRel convertToDrel(RelNode relNode, + AbstractSchema schema, + String tableName, + List<String> partitionColumns, + RelDataType queryRowType, + StorageStrategy storageStrategy) throws RelConversionException, SqlUnsupportedException { final DrillRel convertedRelNode = convertToDrel(relNode); @@ -114,7 +129,7 @@ public class CreateTableHandler extends DefaultSqlHandler { final RelTraitSet traits = convertedRelNode.getCluster().traitSet().plus(DrillRel.DRILL_LOGICAL); final DrillWriterRel writerRel = new DrillWriterRel(convertedRelNode.getCluster(), - traits, topPreservedNameProj, schema.createNewTable(tableName, partitionColumns)); + traits, topPreservedNameProj, schema.createNewTable(tableName, partitionColumns, storageStrategy)); return new DrillScreenRel(writerRel.getCluster(), writerRel.getTraitSet(), writerRel); } @@ -186,7 +201,7 @@ public class CreateTableHandler extends DefaultSqlHandler { return (Prel) prel.copy(projectUnderWriter.getTraitSet(), Collections.singletonList( (RelNode) projectUnderWriter)); } else { - // find list of partiiton columns. + // find list of partition columns. final List<RexNode> partitionColumnExprs = Lists.newArrayListWithExpectedSize(partitionColumns.size()); for (final String colName : partitionColumns) { final RelDataTypeField field = childRowType.getField(colName, false, false); @@ -242,4 +257,62 @@ public class CreateTableHandler extends DefaultSqlHandler { return node; } + /** + * Resolves schema taking into account type of table being created. + * If schema path wasn't indicated in sql call and table type to be created is temporary + * returns temporary workspace. + * + * If schema path is indicated, resolves to mutable drill schema. + * Though if table to be created is temporary table, checks if resolved schema is temporary, + * since temporary table are allowed to be created only in temporary workspace. + * + * @param sqlCreateTable create table call + * @param defaultSchema default schema + * @param config drill config + * @return resolved schema + * @throws UserException if attempted to create temporary table outside of temporary workspace + */ + private AbstractSchema resolveSchema(SqlCreateTable sqlCreateTable, SchemaPlus defaultSchema, DrillConfig config) { + if (sqlCreateTable.isTemporary() && sqlCreateTable.getSchemaPath().size() == 0) { + return SchemaUtilites.getTemporaryWorkspace(defaultSchema, config); + } else { + AbstractSchema resolvedSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, sqlCreateTable.getSchemaPath()); + boolean isTemporaryWorkspace = SchemaUtilites.isTemporaryWorkspace(resolvedSchema.getFullSchemaName(), config); + + if (sqlCreateTable.isTemporary() && !isTemporaryWorkspace) { + throw UserException + .validationError() + .message(String.format("Temporary tables are not allowed to be created " + + "outside of default temporary workspace [%s].", config.getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE))) + .build(logger); + } + return resolvedSchema; + } + } + + /** + * Checks if any object (persistent table / temporary table / view) + * with the same name as table to be created exists in indicated schema. + * + * @param drillSchema schema where table will be created + * @param tableName table name + * @param config drill config + * @param userSession current user session + * @throws UserException if duplicate is found + */ + private void checkDuplicatedObjectExistence(AbstractSchema drillSchema, + String tableName, + DrillConfig config, + UserSession userSession) { + String schemaPath = drillSchema.getFullSchemaName(); + boolean isTemporaryTable = userSession.isTemporaryTable(drillSchema, config, tableName); + + if (isTemporaryTable || SqlHandlerUtil.getTableFromSchema(drillSchema, tableName) != null) { + throw UserException + .validationError() + .message("A table or view with given name [%s] already exists in schema [%s]", + tableName, schemaPath) + .build(logger); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java index 517c183..a9895db 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropTableHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,19 +18,21 @@ package org.apache.drill.exec.planner.sql.handlers; import java.io.IOException; +import java.util.List; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; -import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.planner.sql.DirectPlan; import org.apache.drill.exec.planner.sql.SchemaUtilites; import org.apache.drill.exec.planner.sql.parser.SqlDropTable; +import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.store.AbstractSchema; // SqlHandler for dropping a table. @@ -46,6 +48,7 @@ public class DropTableHandler extends DefaultSqlHandler { * Function resolves the schema and invokes the drop method * (while IF EXISTS statement is used function invokes the drop method only if table exists). * Raises an exception if the schema is immutable. + * * @param sqlNode - SqlDropTable (SQL parse tree of drop table [if exists] query) * @return - Single row indicating drop succeeded or table is not found while IF EXISTS statement is used, * raise exception otherwise @@ -55,35 +58,51 @@ public class DropTableHandler extends DefaultSqlHandler { */ @Override public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException { - SqlDropTable dropTableNode = ((SqlDropTable) sqlNode); - SqlIdentifier tableIdentifier = dropTableNode.getTableIdentifier(); - + String originalTableName = dropTableNode.getName(); SchemaPlus defaultSchema = config.getConverter().getDefaultSchema(); - AbstractSchema drillSchema = null; + List<String> tableSchema = dropTableNode.getSchema(); + DrillConfig drillConfig = context.getConfig(); + UserSession session = context.getSession(); - if (tableIdentifier != null) { - drillSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, dropTableNode.getSchema()); - } - - String tableName = dropTableNode.getName(); - if (drillSchema == null) { - throw UserException.validationError() - .message("Invalid table_name [%s]", tableName) - .build(logger); - } + AbstractSchema temporarySchema = resolveToTemporarySchema(tableSchema, defaultSchema, drillConfig); + boolean isTemporaryTable = session.isTemporaryTable(temporarySchema, drillConfig, originalTableName); - if (dropTableNode.checkTableExistence()) { - final Table tableToDrop = SqlHandlerUtil.getTableFromSchema(drillSchema, tableName); + if (isTemporaryTable) { + session.removeTemporaryTable(temporarySchema, originalTableName); + } else { + AbstractSchema drillSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, tableSchema); + Table tableToDrop = SqlHandlerUtil.getTableFromSchema(drillSchema, originalTableName); if (tableToDrop == null || tableToDrop.getJdbcTableType() != Schema.TableType.TABLE) { - return DirectPlan.createDirectPlan(context, true, - String.format("Table [%s] not found", tableName)); + if (dropTableNode.checkTableExistence()) { + return DirectPlan.createDirectPlan(context, false, String.format("Table [%s] not found", originalTableName)); + } else { + throw UserException.validationError().message("Table [%s] not found", originalTableName).build(logger); + } } + SqlHandlerUtil.dropTableFromSchema(drillSchema, originalTableName); } - drillSchema.dropTable(tableName); + String message = String.format("%s [%s] dropped", isTemporaryTable ? "Temporary table" : "Table", originalTableName); + logger.info(message); + return DirectPlan.createDirectPlan(context, true, message); + } - return DirectPlan.createDirectPlan(context, true, - String.format("Table [%s] %s", tableName, "dropped")); + /** + * If table schema is not indicated in sql call, returns temporary workspace. + * If schema is indicated, resolves to mutable table schema. + * + * @param tableSchema table schema + * @param defaultSchema default schema + * @param config drill config + * @return resolved schema + */ + private AbstractSchema resolveToTemporarySchema(List<String> tableSchema, SchemaPlus defaultSchema, DrillConfig config) { + if (tableSchema.size() == 0) { + return SchemaUtilites.getTemporaryWorkspace(defaultSchema, config); + } else { + return SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, tableSchema); + } } + } http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java index ca7a510..04930a8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,31 +22,24 @@ import com.google.common.collect.Sets; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlWriter; -import org.apache.calcite.sql.TypedSqlNode; import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.tools.Planner; import org.apache.calcite.tools.RelConversionException; -import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.planner.StarColumnHelper; import org.apache.drill.exec.planner.common.DrillRelOptUtil; -import org.apache.drill.exec.planner.sql.DirectPlan; -import org.apache.drill.exec.planner.types.DrillFixedRelDataTypeImpl; import org.apache.drill.exec.store.AbstractSchema; import org.apache.calcite.tools.ValidationException; import org.apache.calcite.rel.RelNode; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.sql.SqlNode; -import org.apache.drill.exec.store.ischema.Records; +import java.io.IOException; import java.util.AbstractList; import java.util.HashSet; import java.util.List; @@ -235,4 +228,42 @@ public class SqlHandlerUtil { writer.keyword(")"); } + /** + * Drops table from schema. + * If drop has failed makes concurrency check: checks if table still exists. + * If table exists, throws {@link @UserException} since drop was unsuccessful, + * otherwise assumes that other user had dropped the view and exists without error. + * + * @param drillSchema drill schema + * @param tableName table name + */ + public static void dropTableFromSchema(AbstractSchema drillSchema, String tableName) { + try { + drillSchema.dropTable(tableName); + } catch (Exception e) { + if (SqlHandlerUtil.getTableFromSchema(drillSchema, tableName) != null) { + throw e; + } + } + } + + /** + * Drops view from schema. + * If drop has failed makes concurrency check: checks if view still exists. + * If view exists, throws {@link @UserException} since drop was unsuccessful, + * otherwise assumes that other user had dropped the view and exists without error. + * + * @param drillSchema drill schema + * @param viewName view name + */ + public static void dropViewFromSchema(AbstractSchema drillSchema, String viewName) throws IOException { + try { + drillSchema.dropView(viewName); + } catch (Exception e) { + if (SqlHandlerUtil.getTableFromSchema(drillSchema, viewName) != null) { + throw e; + } + } + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java index b8396e6..495e8b5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ViewHandler.java @@ -62,9 +62,10 @@ public abstract class ViewHandler extends DefaultSqlHandler { final String newViewName = createView.getName(); + // Disallow temporary tables usage in view definition + config.getConverter().disallowTemporaryTables(); // Store the viewSql as view def SqlNode is modified as part of the resolving the new table definition below. final String viewSql = createView.getQuery().toString(); - final ConvertedRelNode convertedRelNode = validateAndConvert(createView.getQuery()); final RelDataType validatedRowType = convertedRelNode.getValidatedRowType(); final RelNode queryRelNode = convertedRelNode.getConvertedNode(); @@ -74,36 +75,50 @@ public abstract class ViewHandler extends DefaultSqlHandler { final SchemaPlus defaultSchema = context.getNewDefaultSchema(); final AbstractSchema drillSchema = SchemaUtilites.resolveToMutableDrillSchema(defaultSchema, createView.getSchemaPath()); - final String schemaPath = drillSchema.getFullSchemaName(); final View view = new View(newViewName, viewSql, newViewRelNode.getRowType(), SchemaUtilites.getSchemaPathAsList(defaultSchema)); - final Table existingTable = SqlHandlerUtil.getTableFromSchema(drillSchema, newViewName); - - if (existingTable != null) { - if (existingTable.getJdbcTableType() != Schema.TableType.VIEW) { - // existing table is not a view - throw UserException.validationError() - .message("A non-view table with given name [%s] already exists in schema [%s]", - newViewName, schemaPath) - .build(logger); - } - - if (existingTable.getJdbcTableType() == Schema.TableType.VIEW && !createView.getReplace()) { - // existing table is a view and create view has no "REPLACE" clause - throw UserException.validationError() - .message("A view with given name [%s] already exists in schema [%s]", - newViewName, schemaPath) - .build(logger); - } - } + validateViewCreationPossibility(drillSchema, createView, context); final boolean replaced = drillSchema.createView(view); final String summary = String.format("View '%s' %s successfully in '%s' schema", - createView.getName(), replaced ? "replaced" : "created", schemaPath); + createView.getName(), replaced ? "replaced" : "created", drillSchema.getFullSchemaName()); return DirectPlan.createDirectPlan(context, true, summary); } + + /** + * Validates if view can be created in indicated schema: + * checks if object (persistent / temporary table) with the same name exists + * or if view with the same name exists but replace flag is not set. + * + * @param drillSchema schema where views will be created + * @param view create view call + * @param context query context + * @throws UserException if views can be created in indicated schema + */ + private void validateViewCreationPossibility(AbstractSchema drillSchema, SqlCreateView view, QueryContext context) { + final String schemaPath = drillSchema.getFullSchemaName(); + final String viewName = view.getName(); + final Table existingTable = SqlHandlerUtil.getTableFromSchema(drillSchema, viewName); + + if ((existingTable != null && existingTable.getJdbcTableType() != Schema.TableType.VIEW) || + context.getSession().isTemporaryTable(drillSchema, context.getConfig(), viewName)) { + // existing table is not a view + throw UserException + .validationError() + .message("A non-view table with given name [%s] already exists in schema [%s]", viewName, schemaPath) + .build(logger); + } + + if ((existingTable != null && existingTable.getJdbcTableType() == Schema.TableType.VIEW) && !view.getReplace()) { + // existing table is a view and create view has no "REPLACE" clause + throw UserException + .validationError() + .message("A view with given name [%s] already exists in schema [%s]", viewName, schemaPath) + .build(logger); + } + } } /** Handler for Drop View [If Exists] DDL command. */ @@ -124,7 +139,7 @@ public abstract class ViewHandler extends DefaultSqlHandler { final Table viewToDrop = SqlHandlerUtil.getTableFromSchema(drillSchema, viewName); if (dropView.checkViewExistence()) { if (viewToDrop == null || viewToDrop.getJdbcTableType() != Schema.TableType.VIEW){ - return DirectPlan.createDirectPlan(context, true, + return DirectPlan.createDirectPlan(context, false, String.format("View [%s] not found in schema [%s].", viewName, schemaPath)); } } else { @@ -139,7 +154,7 @@ public abstract class ViewHandler extends DefaultSqlHandler { } } - drillSchema.dropView(viewName); + SqlHandlerUtil.dropViewFromSchema(drillSchema, viewName); return DirectPlan.createDirectPlan(context, true, String.format("View [%s] deleted successfully from schema [%s].", viewName, schemaPath)); http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java index 53e3cd5..db934e2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -161,7 +161,7 @@ public class CompoundIdentifierConverter extends SqlShuttle { //SqlNode offset, //SqlNode fetch, rules.put(SqlSelect.class, R(D, E, D, E, E, E, E, E, D, D)); - rules.put(SqlCreateTable.class, R(D, D, D, E)); + rules.put(SqlCreateTable.class, R(D, D, D, E, D)); rules.put(SqlCreateView.class, R(D, E, E, D)); rules.put(SqlDescribeTable.class, R(D, D, E)); rules.put(SqlDropView.class, R(D, D)); http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java index 5835b10..bba60b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -48,8 +48,13 @@ public class SqlCreateTable extends DrillSqlCall { public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_TABLE", SqlKind.OTHER) { @Override public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) { - Preconditions.checkArgument(operands.length == 4, "SqlCreateTable.createCall() has to get 4 operands!"); - return new SqlCreateTable(pos, (SqlIdentifier) operands[0], (SqlNodeList) operands[1], (SqlNodeList) operands[2], operands[3]); + Preconditions.checkArgument(operands.length == 5, "SqlCreateTable.createCall() has to get 5 operands!"); + return new SqlCreateTable(pos, + (SqlIdentifier) operands[0], + (SqlNodeList) operands[1], + (SqlNodeList) operands[2], + operands[3], + (SqlLiteral) operands[4]); } }; @@ -57,13 +62,20 @@ public class SqlCreateTable extends DrillSqlCall { private final SqlNodeList fieldList; private final SqlNodeList partitionColumns; private final SqlNode query; - - public SqlCreateTable(SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList, SqlNodeList partitionColumns, SqlNode query) { + private final SqlLiteral isTemporary; + + public SqlCreateTable(SqlParserPos pos, + SqlIdentifier tblName, + SqlNodeList fieldList, + SqlNodeList partitionColumns, + SqlNode query, + SqlLiteral isTemporary) { super(pos); this.tblName = tblName; this.fieldList = fieldList; this.partitionColumns = partitionColumns; this.query = query; + this.isTemporary = isTemporary; } @Override @@ -78,12 +90,16 @@ public class SqlCreateTable extends DrillSqlCall { ops.add(fieldList); ops.add(partitionColumns); ops.add(query); + ops.add(isTemporary); return ops; } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("CREATE"); + if (isTemporary.booleanValue()) { + writer.keyword("TEMPORARY"); + } writer.keyword("TABLE"); tblName.unparse(writer, leftPrec, rightPrec); if (fieldList.size() > 0) { @@ -142,4 +158,6 @@ public class SqlCreateTable extends DrillSqlCall { public SqlNode getQuery() { return query; } + public boolean isTemporary() { return isTemporary.booleanValue(); } + } http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index 4e17249..281b124 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -299,6 +299,17 @@ public class UserServer extends BasicServer<RpcType, UserClientConnectionImpl> { public SocketAddress getRemoteAddress() { return getChannel().remoteAddress(); } + + @Override + public void closeSession() { + session.close(); + } + + @Override + public void close() { + closeSession(); + super.close(); + } } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java index 3bf9051..c3639d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,9 +17,14 @@ */ package org.apache.drill.exec.rpc.user; +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Paths; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; @@ -27,9 +32,13 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; import org.apache.calcite.tools.ValidationException; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.planner.sql.SchemaUtilites; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerUtil; import org.apache.drill.exec.proto.UserBitShared.UserCredentials; import org.apache.drill.exec.proto.UserProtos.Property; import org.apache.drill.exec.proto.UserProtos.UserProperties; @@ -37,8 +46,14 @@ import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.SessionOptionManager; import com.google.common.collect.Maps; - -public class UserSession { +import org.apache.drill.exec.store.AbstractSchema; +import org.apache.drill.exec.store.StorageStrategy; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class UserSession implements Closeable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserSession.class); public static final String SCHEMA = "schema"; @@ -54,18 +69,43 @@ public class UserSession { private Map<String, String> properties; private OptionManager sessionOptions; private final AtomicInteger queryCount; + private final String sessionId; + + /** Stores list of temporary tables, key is original table name converted to lower case to achieve case-insensitivity, + * value is generated table name. **/ + private final ConcurrentMap<String, String> temporaryTables; + /** Stores list of session temporary locations, key is path to location, value is file system associated with location. **/ + private final ConcurrentMap<Path, FileSystem> temporaryLocations; + + /** On session close deletes all session temporary locations recursively and clears temporary locations list. */ + @Override + public void close() { + for (Map.Entry<Path, FileSystem> entry : temporaryLocations.entrySet()) { + Path path = entry.getKey(); + FileSystem fs = entry.getValue(); + try { + fs.delete(path, true); + logger.info("Deleted session temporary location [{}] from file system [{}]", + path.toUri().getPath(), fs.getUri()); + } catch (Exception e) { + logger.warn("Error during session temporary location [{}] deletion from file system [{}]: [{}]", + path.toUri().getPath(), fs.getUri(), e.getMessage()); + } + } + temporaryLocations.clear(); + } /** * Implementations of this interface are allowed to increment queryCount. * {@link org.apache.drill.exec.work.user.UserWorker} should have a member that implements the interface. * No other core class should implement this interface. Test classes may implement (see ControlsInjectionUtil). */ - public static interface QueryCountIncrementer { - public void increment(final UserSession session); + public interface QueryCountIncrementer { + void increment(final UserSession session); } public static class Builder { - UserSession userSession; + private UserSession userSession; public static Builder newBuilder() { return new Builder(); @@ -115,6 +155,9 @@ public class UserSession { private UserSession() { queryCount = new AtomicInteger(0); + sessionId = UUID.randomUUID().toString(); + temporaryTables = Maps.newConcurrentMap(); + temporaryLocations = Maps.newConcurrentMap(); } public boolean isSupportComplexTypes() { @@ -197,7 +240,7 @@ public class UserSession { /** * Get default schema from current default schema path and given schema tree. - * @param rootSchema + * @param rootSchema root schema * @return A {@link org.apache.calcite.schema.SchemaPlus} object. */ public SchemaPlus getDefaultSchema(SchemaPlus rootSchema) { @@ -207,18 +250,117 @@ public class UserSession { return null; } - final SchemaPlus defaultSchema = SchemaUtilites.findSchema(rootSchema, defaultSchemaPath); + return SchemaUtilites.findSchema(rootSchema, defaultSchemaPath); + } + + public boolean setSessionOption(String name, String value) { + return true; + } + + /** + * @return unique session identifier + */ + public String getSessionId() { return sessionId; } + + /** + * Creates and adds session temporary location if absent using schema configuration. + * Generates temporary table name and stores it's original name as key + * and generated name as value in session temporary tables cache. + * Original temporary name is converted to lower case to achieve case-insensitivity. + * If original table name already exists, new name is not regenerated and is reused. + * This can happen if default temporary workspace was changed (file system or location) or + * orphan temporary table name has remained (name was registered but table creation did not succeed). + * + * @param schema table schema + * @param tableName original table name + * @return generated temporary table name + * @throws IOException if error during session temporary location creation + */ + public String registerTemporaryTable(AbstractSchema schema, String tableName) throws IOException { + addTemporaryLocation((WorkspaceSchemaFactory.WorkspaceSchema) schema); + String temporaryTableName = Paths.get(sessionId, UUID.randomUUID().toString()).toString(); + String oldTemporaryTableName = temporaryTables.putIfAbsent(tableName.toLowerCase(), temporaryTableName); + return oldTemporaryTableName == null ? temporaryTableName : oldTemporaryTableName; + } + + /** + * Returns generated temporary table name from the list of session temporary tables, null otherwise. + * Original temporary name is converted to lower case to achieve case-insensitivity. + * + * @param tableName original table name + * @return generated temporary table name + */ + public String resolveTemporaryTableName(String tableName) { + return temporaryTables.get(tableName.toLowerCase()); + } - if (defaultSchema == null) { - // If the current schema resolves to null, return root schema as the current default schema. - return defaultSchema; + /** + * Checks if passed table is temporary, table name is case-insensitive. + * Before looking for table checks if passed schema is temporary and returns false if not + * since temporary tables are allowed to be created in temporary workspace only. + * If passed workspace is temporary, looks for temporary table. + * First checks if table name is among temporary tables, if not returns false. + * If temporary table named was resolved, checks that temporary table exists on disk, + * to ensure that temporary table actually exists and resolved table name is not orphan + * (for example, in result of unsuccessful temporary table creation). + * + * @param drillSchema table schema + * @param config drill config + * @param tableName original table name + * @return true if temporary table exists in schema, false otherwise + */ + public boolean isTemporaryTable(AbstractSchema drillSchema, DrillConfig config, String tableName) { + if (!SchemaUtilites.isTemporaryWorkspace(drillSchema.getFullSchemaName(), config)) { + return false; } + String temporaryTableName = resolveTemporaryTableName(tableName); + if (temporaryTableName != null) { + Table temporaryTable = SqlHandlerUtil.getTableFromSchema(drillSchema, temporaryTableName); + if (temporaryTable != null && temporaryTable.getJdbcTableType() == Schema.TableType.TABLE) { + return true; + } + } + return false; + } - return defaultSchema; + /** + * Removes temporary table name from the list of session temporary tables. + * Original temporary name is converted to lower case to achieve case-insensitivity. + * + * @param tableName original table name + */ + public void removeTemporaryTable(AbstractSchema drillSchema, String tableName) { + String temporaryTable = resolveTemporaryTableName(tableName); + if (temporaryTable == null) { + return; + } + SqlHandlerUtil.dropTableFromSchema(drillSchema, temporaryTable); + temporaryTables.remove(tableName.toLowerCase()); } - public boolean setSessionOption(String name, String value) { - return true; + /** + * Session temporary tables are stored under temporary workspace location in session folder + * defined by unique session id. These session temporary locations are deleted on session close. + * If default temporary workspace file system or location is changed at runtime, + * new session temporary location will be added with corresponding file system + * to the list of session temporary locations. If location does not exist it will be created and + * {@link StorageStrategy#TEMPORARY} storage rules will be applied to it. + * + * @param temporaryWorkspace temporary workspace + * @throws IOException in case of error during temporary location creation + */ + private void addTemporaryLocation(WorkspaceSchemaFactory.WorkspaceSchema temporaryWorkspace) throws IOException { + DrillFileSystem fs = temporaryWorkspace.getFS(); + Path temporaryLocation = new Path(Paths.get(fs.getUri().toString(), + temporaryWorkspace.getDefaultLocation(), sessionId).toString()); + + FileSystem fileSystem = temporaryLocations.putIfAbsent(temporaryLocation, fs); + + if (fileSystem == null) { + StorageStrategy.TEMPORARY.createPathAndApply(fs, temporaryLocation); + Preconditions.checkArgument(fs.exists(temporaryLocation), + String.format("Temporary location should exist [%s]", temporaryLocation.toUri().getPath())); + } } private String getProp(String key) { http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 547915e..25776ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,6 +20,7 @@ package org.apache.drill.exec.server; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.StackTrace; import org.apache.drill.common.config.DrillConfig; @@ -30,13 +31,17 @@ import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; import org.apache.drill.exec.exception.DrillbitStartupException; +import org.apache.drill.exec.planner.sql.SchemaUtilites; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.server.options.OptionValue.OptionType; import org.apache.drill.exec.server.rest.WebServer; import org.apache.drill.exec.service.ServiceEngine; +import org.apache.drill.exec.store.AbstractSchema; +import org.apache.drill.exec.store.SchemaTreeProvider; import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory; import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider; import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.store.sys.PersistentStoreRegistry; @@ -123,6 +128,7 @@ public class Drillbit implements AutoCloseable { storageRegistry.init(); drillbitContext.getOptionManager().init(); javaPropertiesToSystemOptions(); + validateTemporaryWorkspace(manager.getContext()); manager.getContext().getRemoteFunctionRegistry().init(context.getConfig(), storeProvider, coord); registrationHandle = coord.register(md); webServer.start(); @@ -215,6 +221,21 @@ public class Drillbit implements AutoCloseable { } /** + * Validates that temporary workspace indicated in configuration is + * mutable and file-based (instance of {@link WorkspaceSchemaFactory.WorkspaceSchema}). + * + * @param context drillbit context + * @throws Exception in case when temporary table schema is not mutable or + * not file-based (instance of WorkspaceSchemaFactory.WorkspaceSchema) + */ + private void validateTemporaryWorkspace(DrillbitContext context) throws Exception { + try (SchemaTreeProvider schemaTreeProvider = new SchemaTreeProvider(context)) { + final SchemaPlus rootSchema = schemaTreeProvider.createRootSchema(context.getOptionManager()); + SchemaUtilites.getTemporaryWorkspace(rootSchema, context.getConfig()); + } + } + + /** * Shutdown hook for Drillbit. Closes the drillbit, and reports on errors that * occur during closure, as well as the location the drillbit was started from. */ http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java index 7a16d0a..618841b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -117,18 +117,33 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer, } /** + * Creates table entry using table name, list of partition columns + * and storage strategy used to create table folder and files * * @param tableName : new table name. * @param partitionColumns : list of partition columns. Empty list if there is no partition columns. - * @return + * @param storageStrategy : storage strategy used to create table folder and files + * @return create table entry */ - public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) { + public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) { throw UserException.unsupportedError() .message("Creating new tables is not supported in schema [%s]", getSchemaPath()) .build(logger); } /** + * Creates table entry using table name and list of partition columns if any. + * Table folder and files will be created using persistent storage strategy. + * + * @param tableName : new table name. + * @param partitionColumns : list of partition columns. Empty list if there is no partition columns. + * @return create table entry + */ + public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) { + return createNewTable(tableName, partitionColumns, StorageStrategy.PERSISTENT); + } + + /** * Reports whether to show items from this schema in INFORMATION_SCHEMA * tables. * (Controls ... TODO: Doc.: Mention what this typically controls or http://git-wip-us.apache.org/repos/asf/drill/blob/bb29f19f/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java index d05cc43..4f426bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaTreeProvider.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,10 @@ import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.ViewExpansionContext; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.store.SchemaConfig.SchemaConfigInfoProvider; import org.apache.drill.exec.util.ImpersonationUtil; @@ -49,6 +52,33 @@ public class SchemaTreeProvider implements AutoCloseable { } /** + * Return root schema for process user. + * + * @param options list of options + * @return root of the schema tree + */ + public SchemaPlus createRootSchema(final OptionManager options) { + SchemaConfigInfoProvider schemaConfigInfoProvider = new SchemaConfigInfoProvider() { + + @Override + public ViewExpansionContext getViewExpansionContext() { + throw new UnsupportedOperationException("View expansion context is not supported"); + } + + @Override + public OptionValue getOption(String optionKey) { + return options.getOption(optionKey); + } + }; + + final SchemaConfig schemaConfig = SchemaConfig.newBuilder( + ImpersonationUtil.getProcessUserName(), schemaConfigInfoProvider) + .build(); + + return createRootSchema(schemaConfig); + } + + /** * Return root schema with schema owner as the given user. * * @param userName Name of the user who is accessing the storage sources.