This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new d4a2aef DRILL-7168: Implement ALTER SCHEMA ADD / REMOVE commands
d4a2aef is described below
commit d4a2aefd530dae6bdb0a3f115bd8ef8643dbc364
Author: Arina Ielchiieva <[email protected]>
AuthorDate: Thu Aug 29 16:15:43 2019 +0300
DRILL-7168: Implement ALTER SCHEMA ADD / REMOVE commands
---
exec/java-exec/src/main/codegen/data/Parser.tdd | 9 +-
.../src/main/codegen/includes/parserImpls.ftl | 121 +++++-
.../exec/planner/sql/handlers/SchemaHandler.java | 182 ++++++++-
.../exec/planner/sql/handlers/SqlHandlerUtil.java | 22 ++
.../sql/parser/CompoundIdentifierConverter.java | 2 +
.../drill/exec/planner/sql/parser/SqlSchema.java | 228 +++++++++--
.../metadata/schema/InlineSchemaProvider.java | 4 +-
.../record/metadata/schema/PathSchemaProvider.java | 7 +-
.../record/metadata/schema/SchemaProvider.java | 8 +-
.../record/metadata/schema/StorageProperties.java | 71 ++++
.../java/org/apache/drill/TestSchemaCommands.java | 415 ++++++++++++++++++++-
.../exec/record/metadata/TestTupleSchema.java | 64 ++++
.../record/metadata/schema/TestSchemaProvider.java | 23 +-
.../record/metadata/AbstractColumnMetadata.java | 1 +
.../exec/record/metadata/AbstractPropertied.java | 5 +
.../drill/exec/record/metadata/Propertied.java | 8 +-
.../drill/exec/record/metadata/TupleSchema.java | 1 +
17 files changed, 1098 insertions(+), 73 deletions(-)
diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd
b/exec/java-exec/src/main/codegen/data/Parser.tdd
index 2da27c3..3480754 100644
--- a/exec/java-exec/src/main/codegen/data/Parser.tdd
+++ b/exec/java-exec/src/main/codegen/data/Parser.tdd
@@ -43,7 +43,8 @@
"ESTIMATE",
"STATISTICS",
"SAMPLE",
- "COLUMNS"
+ "COLUMNS",
+ "REMOVE"
]
# List of methods for parsing custom SQL statements.
@@ -61,7 +62,8 @@
"SqlDropFunction()",
"SqlAnalyzeTable()",
"DrillSqlSetOption(Span.of(), null)",
- "DrillSqlResetOption(Span.of(), null)"
+ "DrillSqlResetOption(Span.of(), null)",
+ "SqlAlterSchema()"
]
# List of methods for parsing custom literals.
@@ -859,7 +861,8 @@
"YEAR",
# "YEARS", # not a keyword in Calcite
"ZONE",
- "COLUMNS"
+ "COLUMNS",
+ "REMOVE"
]
# List of additional join types. Each is a method with no arguments.
diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index be30318..70e0c28 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -314,12 +314,12 @@ SqlNode SqlCreateSchema(SqlParserPos pos, String
createType) :
token_source.SwitchTo(SCH);
}
(
- <LOAD>
+ <SCH_LOAD>
{
load = StringLiteral();
}
|
- <PAREN_STRING>
+ <SCH_PAREN_STRING>
{
schema = SqlLiteral.createCharString(token.image, getPos());
}
@@ -373,13 +373,13 @@ void addProperty(SqlNodeList properties) :
}
<SCH> TOKEN : {
- < LOAD: "LOAD" > { popState(); }
- | < NUM: <DIGIT> (" " | "\t" | "\n" | "\r")* >
+ < SCH_LOAD: "LOAD" > { popState(); }
+ | < SCH_NUM: <DIGIT> (" " | "\t" | "\n" | "\r")* >
// once schema is found, switch back to initial lexical state
// must be enclosed in the parentheses
// inside may have left parenthesis only if number precedes (covers cases
with varchar(10)),
// if left parenthesis is present in column name, it must be escaped with
backslash
- | < PAREN_STRING: <LPAREN> ((~[")"]) | (<NUM> ")") | ("\\)"))* <RPAREN> > {
popState(); }
+ | < SCH_PAREN_STRING: <LPAREN> ((~[")"]) | (<SCH_NUM> ")") | ("\\)"))*
<RPAREN> > { popState(); }
}
/**
@@ -465,7 +465,7 @@ SqlNode SqlDropSchema(SqlParserPos pos) :
/**
* Parse refresh table metadata statement.
- * REFRESH TABLE METADATA [COLUMNS ((field1, field2,..) | NONE)] tblname
+ * REFRESH TABLE METADATA [COLUMNS ((field1, field2,..) | NONE)] table_name
*/
SqlNode SqlRefreshMetadata() :
{
@@ -535,6 +535,110 @@ SqlNode SqlDescribeSchema() :
}
/**
+* Parses ALTER SCHEMA statements:
+*
+* ALTER SCHEMA
+* (FOR TABLE dfs.tmp.nation | PATH '/tmp/schema.json')
+* ADD [OR REPLACE]
+* [COLUMNS (col1 int, col2 varchar)]
+* [PROPERTIES ('prop1'='val1', 'prop2'='val2')]
+*
+* ALTER SCHEMA
+* (FOR TABLE dfs.tmp.nation | PATH '/tmp/schema.json')
+* REMOVE
+* [COLUMNS (`col1`, `col2`)]
+* [PROPERTIES ('prop1', 'prop2')]
+*/
+SqlNode SqlAlterSchema() :
+{
+ SqlParserPos pos;
+ SqlIdentifier table = null;
+ SqlNode path = null;
+}
+{
+ <ALTER> { pos = getPos(); }
+ <SCHEMA>
+ (
+ <FOR> <TABLE> { table = CompoundIdentifier(); }
+ |
+ <PATH> { path = StringLiteral(); }
+ )
+ (
+ <ADD> { return SqlAlterSchemaAdd(pos, table, path); }
+ |
+ <REMOVE> { return SqlAlterSchemaRemove(pos, table, path); }
+ )
+}
+
+SqlNode SqlAlterSchemaAdd(SqlParserPos pos, SqlIdentifier table, SqlNode path)
:
+{
+ boolean replace = false;
+ SqlCharStringLiteral schema = null;
+ SqlNodeList properties = null;
+}
+{
+ [ <OR> <REPLACE> { replace = true; } ]
+ [ <COLUMNS> { schema = ParseSchema(); } ]
+ [
+ <PROPERTIES> <LPAREN>
+ {
+ properties = new SqlNodeList(getPos());
+ addProperty(properties);
+ }
+ (
+ <COMMA> { addProperty(properties); }
+ )*
+ <RPAREN>
+ ]
+ {
+ if (schema == null && properties == null) {
+ throw new ParseException("ALTER SCHEMA ADD command must have at
least <COLUMNS> or <PROPERTIES> keyword indicated.");
+ }
+ return new SqlSchema.Add(pos, table, path,
SqlLiteral.createBoolean(replace, getPos()), schema, properties);
+ }
+}
+
+SqlCharStringLiteral ParseSchema() :
+{}
+{
+ {
+ token_source.pushState();
+ token_source.SwitchTo(SCH);
+ }
+ <SCH_PAREN_STRING>
+ {
+ return SqlLiteral.createCharString(token.image, getPos());
+ }
+}
+
+SqlNode SqlAlterSchemaRemove(SqlParserPos pos, SqlIdentifier table, SqlNode
path) :
+{
+ SqlNodeList columns = null;
+ SqlNodeList properties = null;
+}
+{
+ [ <COLUMNS> { columns = ParseRequiredFieldList("Schema"); } ]
+ [
+ <PROPERTIES> <LPAREN>
+ {
+ properties = new SqlNodeList(getPos());
+ properties.add(StringLiteral());
+ }
+ (
+ <COMMA>
+ { properties.add(StringLiteral()); }
+ )*
+ <RPAREN>
+ ]
+ {
+ if (columns == null && properties == null) {
+ throw new ParseException("ALTER SCHEMA REMOVE command must have
at least <COLUMNS> or <PROPERTIES> keyword indicated.");
+ }
+ return new SqlSchema.Remove(pos, table, path, columns, properties);
+ }
+}
+
+/**
* Parse create UDF statement
* CREATE FUNCTION USING JAR 'jar_name'
*/
@@ -592,9 +696,10 @@ Pair<SqlNodeList, SqlNodeList>
ParenthesizedCompoundIdentifierList() :
}
}
</#if>
+
/**
* Parses a analyze statement.
- * ANALYZE TABLE tblname {COMPUTE | ESTIMATE} | STATISTICS
+ * ANALYZE TABLE table_name {COMPUTE | ESTIMATE} | STATISTICS
* [(column1, column2, ...)] [ SAMPLE numeric PERCENT ]
*/
SqlNode SqlAnalyzeTable() :
@@ -695,5 +800,3 @@ DrillSqlResetOption DrillSqlResetOption(Span s, String
scope) :
return new DrillSqlResetOption(s.end(name), scope, name);
}
}
-
-
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java
index cbc311d..3487234 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java
@@ -31,10 +31,13 @@ import
org.apache.drill.exec.planner.sql.parser.SqlCreateType;
import org.apache.drill.exec.planner.sql.parser.SqlSchema;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.record.metadata.schema.InlineSchemaProvider;
import org.apache.drill.exec.record.metadata.schema.PathSchemaProvider;
import org.apache.drill.exec.record.metadata.schema.SchemaContainer;
import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
import org.apache.drill.exec.record.metadata.schema.SchemaProviderFactory;
+import org.apache.drill.exec.record.metadata.schema.StorageProperties;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
@@ -43,20 +46,25 @@ import
org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
- * Parent class for CREATE / DROP / DESCRIBE SCHEMA handlers.
+ * Parent class for CREATE / DROP / DESCRIBE / ALTER SCHEMA handlers.
* Contains common logic on how extract workspace, output error result.
*/
public abstract class SchemaHandler extends DefaultSqlHandler {
- static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(SchemaHandler.class);
+ static final Logger logger = LoggerFactory.getLogger(SchemaHandler.class);
SchemaHandler(SqlHandlerConfig config) {
super(config);
@@ -94,6 +102,17 @@ public abstract class SchemaHandler extends
DefaultSqlHandler {
}
/**
+ * Provides storage strategy which will create schema file
+ * with same permission as used for persistent tables.
+ *
+ * @return storage strategy
+ */
+ StorageStrategy getStorageStrategy() {
+ return new StorageStrategy(context.getOption(
+ ExecConstants.PERSISTENT_TABLE_UMASK).string_val, false);
+ }
+
+ /**
* CREATE SCHEMA command handler.
*/
public static class Create extends SchemaHandler {
@@ -120,10 +139,13 @@ public abstract class SchemaHandler extends
DefaultSqlHandler {
}
}
- // schema file will be created with same permission as used for
persistent tables
- StorageStrategy storageStrategy = new
StorageStrategy(context.getOption(
- ExecConstants.PERSISTENT_TABLE_UMASK).string_val, false);
- schemaProvider.store(schemaString, sqlCall.getProperties(),
storageStrategy);
+ StorageProperties storageProperties = StorageProperties.builder()
+ .storageStrategy(getStorageStrategy())
+ .overwrite(false)
+ .build();
+
+ schemaProvider.store(schemaString, sqlCall.getProperties(),
storageProperties);
+
return DirectPlan.createDirectPlan(context, true,
String.format("Created schema for [%s]", schemaSource));
} catch (IOException e) {
throw UserException.resourceError(e)
@@ -294,7 +316,155 @@ public abstract class SchemaHandler extends
DefaultSqlHandler {
this.schema = schema;
}
}
+ }
+
+ /**
+ * ALTER SCHEMA ADD command handler.
+ */
+ public static class Add extends SchemaHandler {
+
+ public Add(SqlHandlerConfig config) {
+ super(config);
+ }
+
+ @Override
+ public PhysicalPlan getPlan(SqlNode sqlNode) {
+ SqlSchema.Add addCall = ((SqlSchema.Add) sqlNode);
+ String schemaSource = addCall.hasTable() ? addCall.getTable().toString()
: addCall.getPath();
+
+ try {
+ SchemaProvider schemaProvider = SchemaProviderFactory.create(addCall,
this);
+ if (!schemaProvider.exists()) {
+ throw UserException.resourceError()
+ .message("Schema does not exist for [%s]", schemaSource)
+ .addContext("Command: ALTER SCHEMA ADD")
+ .build(logger);
+ }
+
+ TupleMetadata currentSchema = schemaProvider.read().getSchema();
+ TupleMetadata newSchema = new TupleSchema();
+
+ if (addCall.hasSchema()) {
+ InlineSchemaProvider inlineSchemaProvider = new
InlineSchemaProvider(addCall.getSchema());
+ TupleMetadata providedSchema =
inlineSchemaProvider.read().getSchema();
+
+ if (addCall.isReplace()) {
+ Map<String, ColumnMetadata> columnsMap =
Stream.concat(currentSchema.toMetadataList().stream(),
providedSchema.toMetadataList().stream())
+ .collect(Collectors.toMap(
+ ColumnMetadata::name,
+ Function.identity(),
+ (o, n) -> n, // replace existing columns
+ LinkedHashMap::new)); // preserve initial order of the columns
+ columnsMap.values().forEach(newSchema::addColumn);
+ } else {
+ Stream.concat(currentSchema.toMetadataList().stream(),
providedSchema.toMetadataList().stream())
+ .forEach(newSchema::addColumn);
+ }
+ } else {
+ currentSchema.toMetadataList().forEach(newSchema::addColumn);
+ }
+
+ if (addCall.hasProperties()) {
+ if (addCall.isReplace()) {
+ newSchema.setProperties(currentSchema.properties());
+ newSchema.setProperties(addCall.getProperties());
+ } else {
+ Map<String, String> newProperties =
Stream.concat(currentSchema.properties().entrySet().stream(),
addCall.getProperties().entrySet().stream())
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue)); // no merge strategy is provided to
fail on duplicate
+ newSchema.setProperties(newProperties);
+ }
+ } else {
+ newSchema.setProperties(currentSchema.properties());
+ }
+
+ String schemaString = newSchema.toMetadataList().stream()
+ .map(ColumnMetadata::columnString)
+ .collect(Collectors.joining(", "));
+
+ StorageProperties storageProperties = StorageProperties.builder()
+ .storageStrategy(getStorageStrategy())
+ .overwrite()
+ .build();
+
+ schemaProvider.store(schemaString, newSchema.properties(),
storageProperties);
+ return DirectPlan.createDirectPlan(context, true,
String.format("Schema for [%s] was updated", schemaSource));
+
+ } catch (IOException e) {
+ throw UserException.resourceError(e)
+ .message("Error while accessing / modifying schema for [%s]: %s",
schemaSource, e.getMessage())
+ .build(logger);
+ } catch (IllegalArgumentException | IllegalStateException e) {
+ throw UserException.validationError(e)
+ .message(e.getMessage())
+ .addContext("Error while preparing / creating schema for [%s]",
schemaSource)
+ .build(logger);
+ }
+ }
}
+ /**
+ * ALTER SCHEMA REMOVE command handler.
+ */
+ public static class Remove extends SchemaHandler {
+
+ public Remove(SqlHandlerConfig config) {
+ super(config);
+ }
+
+ @Override
+ public PhysicalPlan getPlan(SqlNode sqlNode) {
+ SqlSchema.Remove removeCall = ((SqlSchema.Remove) sqlNode);
+ String schemaSource = removeCall.hasTable() ?
removeCall.getTable().toString() : removeCall.getPath();
+
+ try {
+ SchemaProvider schemaProvider =
SchemaProviderFactory.create(removeCall, this);
+
+ if (!schemaProvider.exists()) {
+ throw UserException.resourceError()
+ .message("Schema does not exist for [%s]", schemaSource)
+ .addContext("Command: ALTER SCHEMA REMOVE")
+ .build(logger);
+ }
+
+ TupleMetadata currentSchema = schemaProvider.read().getSchema();
+ TupleMetadata newSchema = new TupleSchema();
+
+ List<String> columns = removeCall.getColumns();
+
+ currentSchema.toMetadataList().stream()
+ .filter(column -> columns == null ||
!columns.contains(column.name()))
+ .forEach(newSchema::addColumn);
+
+ newSchema.setProperties(currentSchema.properties());
+ if (removeCall.hasProperties()) {
+ removeCall.getProperties().forEach(newSchema::removeProperty);
+ }
+
+ StorageProperties storageProperties = StorageProperties.builder()
+ .storageStrategy(getStorageStrategy())
+ .overwrite()
+ .build();
+
+ String schemaString = newSchema.toMetadataList().stream()
+ .map(ColumnMetadata::columnString)
+ .collect(Collectors.joining(", "));
+
+ schemaProvider.store(schemaString, newSchema.properties(),
storageProperties);
+ return DirectPlan.createDirectPlan(context, true,
String.format("Schema for [%s] was updated", schemaSource));
+
+ } catch (IOException e) {
+ throw UserException.resourceError(e)
+ .message("Error while accessing / modifying schema for [%s]: %s",
schemaSource, e.getMessage())
+ .build(logger);
+ } catch (IllegalArgumentException e) {
+ throw UserException.validationError(e)
+ .message(e.getMessage())
+ .addContext("Error while preparing / creating schema for [%s]",
schemaSource)
+ .build(logger);
+ }
+ }
+ }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
index 5981206..43d2f2a 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
@@ -272,4 +272,26 @@ public class SqlHandlerUtil {
}
}
+ /**
+ * Unparses given {@link SqlNodeList} into key / values pairs: (k1 = v1, k2
= v2).
+ *
+ * @param writer sql writer
+ * @param leftPrec left precedence
+ * @param rightPrec right precedence
+ * @param list sql node list
+ */
+ public static void unparseKeyValuePairs(SqlWriter writer, int leftPrec, int
rightPrec, SqlNodeList list) {
+ writer.keyword("(");
+
+ for (int i = 1; i < list.size(); i += 2) {
+ if (i != 1) {
+ writer.keyword(",");
+ }
+ list.get(i - 1).unparse(writer, leftPrec, rightPrec);
+ writer.keyword("=");
+ list.get(i).unparse(writer, leftPrec, rightPrec);
+ }
+
+ writer.keyword(")");
+ }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
index ac0d163..e9f586e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
@@ -83,6 +83,8 @@ public class CompoundIdentifierConverter extends SqlShuttle {
.put(SqlSchema.Create.class, arrayOf(D, D, D, D, D, D))
.put(SqlSchema.Drop.class, arrayOf(D, D))
.put(SqlSchema.Describe.class, arrayOf(D, D))
+ .put(SqlSchema.Add.class, arrayOf(D, D, D, D, D, D))
+ .put(SqlSchema.Remove.class, arrayOf(D, D, D, D, D))
.build();
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java
index 81e8910..44a3adb 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java
@@ -32,6 +32,7 @@ import org.apache.calcite.sql.util.SqlBasicVisitor;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
import org.apache.drill.exec.planner.sql.handlers.SchemaHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerUtil;
import org.apache.drill.exec.store.dfs.FileSelection;
import java.util.Arrays;
@@ -39,9 +40,10 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
- * Parent class for CREATE, DROP, DESCRIBE SCHEMA commands.
+ * Parent class for CREATE, DROP, DESCRIBE, ALTER SCHEMA commands.
* Holds logic common command property: table, path.
*/
public abstract class SqlSchema extends DrillSqlCall {
@@ -61,6 +63,11 @@ public abstract class SqlSchema extends DrillSqlCall {
writer.keyword("FOR TABLE");
table.unparse(writer, leftPrec, rightPrec);
}
+
+ if (path != null) {
+ writer.keyword("PATH");
+ path.unparse(writer, leftPrec, rightPrec);
+ }
}
public boolean hasTable() {
@@ -90,6 +97,20 @@ public abstract class SqlSchema extends DrillSqlCall {
return path == null ? null : path.accept(LiteralVisitor.INSTANCE);
}
+ protected Map<String, String> getProperties(SqlNodeList properties) {
+ if (properties == null) {
+ return null;
+ }
+
+ // preserve properties order
+ Map<String, String> map = new LinkedHashMap<>();
+ for (int i = 1; i < properties.size(); i += 2) {
+ map.put(properties.get(i - 1).accept(LiteralVisitor.INSTANCE),
+ properties.get(i).accept(LiteralVisitor.INSTANCE));
+ }
+ return map;
+ }
+
/**
* Visits literal and returns bare value (i.e. single quotes).
*/
@@ -155,35 +176,21 @@ public abstract class SqlSchema extends DrillSqlCall {
writer.keyword("REPLACE");
}
- writer.keyword("SCHEMA");
- writer.literal(getSchema());
-
- super.unparse(writer, leftPrec, rightPrec);
+ if (schema != null) {
+ writer.keyword("SCHEMA");
+ writer.literal(getSchema());
+ }
if (load != null) {
writer.keyword("LOAD");
load.unparse(writer, leftPrec, rightPrec);
}
- if (path != null) {
- writer.keyword("PATH");
- path.unparse(writer, leftPrec, rightPrec);
- }
+ super.unparse(writer, leftPrec, rightPrec);
if (properties != null) {
writer.keyword("PROPERTIES");
- writer.keyword("(");
-
- for (int i = 1; i < properties.size(); i += 2) {
- if (i != 1) {
- writer.keyword(",");
- }
- properties.get(i - 1).unparse(writer, leftPrec, rightPrec);
- writer.keyword("=");
- properties.get(i).unparse(writer, leftPrec, rightPrec);
- }
-
- writer.keyword(")");
+ SqlHandlerUtil.unparseKeyValuePairs(writer, leftPrec, rightPrec,
properties);
}
}
@@ -205,23 +212,12 @@ public abstract class SqlSchema extends DrillSqlCall {
}
public Map<String, String> getProperties() {
- if (properties == null) {
- return null;
- }
-
- // preserve properties order
- Map<String, String> map = new LinkedHashMap<>();
- for (int i = 1; i < properties.size(); i += 2) {
- map.put(properties.get(i - 1).accept(LiteralVisitor.INSTANCE),
- properties.get(i).accept(LiteralVisitor.INSTANCE));
- }
- return map;
+ return getProperties(properties);
}
public SqlCreateType getSqlCreateType() {
return SqlCreateType.valueOf(createType.toValue());
}
-
}
/**
@@ -274,7 +270,6 @@ public abstract class SqlSchema extends DrillSqlCall {
public boolean ifExists() {
return existenceCheck.booleanValue();
}
-
}
/**
@@ -339,4 +334,169 @@ public abstract class SqlSchema extends DrillSqlCall {
}
}
+ public static class Add extends SqlSchema {
+
+ private final SqlLiteral replace;
+ private final SqlCharStringLiteral schema;
+ private final SqlNodeList properties;
+
+ public static final SqlSpecialOperator OPERATOR = new
SqlSpecialOperator("ALTER_SCHEMA_ADD", SqlKind.OTHER_DDL) {
+ @Override
+ public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos
pos, SqlNode... operands) {
+ return new Add(pos, (SqlIdentifier) operands[0], operands[1],
(SqlLiteral) operands[2],
+ (SqlCharStringLiteral) operands[3], (SqlNodeList) operands[4]);
+ }
+ };
+
+ public Add(SqlParserPos pos,
+ SqlIdentifier table,
+ SqlNode path,
+ SqlLiteral replace,
+ SqlCharStringLiteral schema,
+ SqlNodeList properties) {
+ super(pos, table, path);
+ this.replace = replace;
+ this.schema = schema;
+ this.properties = properties;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return Arrays.asList(table, path, replace, schema, properties);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("ALTER");
+ writer.keyword("SCHEMA");
+ writer.keyword("ADD");
+
+ if (replace.booleanValue()) {
+ writer.keyword("OR");
+ writer.keyword("REPLACE");
+ }
+
+ super.unparse(writer, leftPrec, rightPrec);
+
+ if (schema != null) {
+ writer.keyword("COLUMNS");
+ writer.literal(getSchema());
+ }
+
+ if (properties != null) {
+ writer.keyword("PROPERTIES");
+ SqlHandlerUtil.unparseKeyValuePairs(writer, leftPrec, rightPrec,
properties);
+ }
+ }
+
+ @Override
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new SchemaHandler.Add(config);
+ }
+
+ public boolean isReplace() {
+ return replace.booleanValue();
+ }
+
+ public boolean hasSchema() {
+ return schema != null;
+ }
+
+ public String getSchema() {
+ return hasSchema() ? schema.toValue() : null;
+ }
+
+ public boolean hasProperties() {
+ return properties != null;
+ }
+
+ public Map<String, String> getProperties() {
+ return getProperties(properties);
+ }
+ }
+
+ public static class Remove extends SqlSchema {
+
+ private final SqlNodeList columns;
+ private final SqlNodeList properties;
+
+ public static final SqlSpecialOperator OPERATOR = new
SqlSpecialOperator("ALTER_SCHEMA_REMOVE", SqlKind.OTHER_DDL) {
+ @Override
+ public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos
pos, SqlNode... operands) {
+ return new Remove(pos, (SqlIdentifier) operands[0], operands[1],
+ (SqlNodeList) operands[2], (SqlNodeList) operands[3]);
+ }
+ };
+
+ public Remove(SqlParserPos pos,
+ SqlIdentifier table,
+ SqlNode path,
+ SqlNodeList columns,
+ SqlNodeList properties) {
+ super(pos, table, path);
+ this.columns = columns;
+ this.properties = properties;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return Arrays.asList(table, path, columns, properties);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("ALTER");
+ writer.keyword("SCHEMA");
+ writer.keyword("REMOVE");
+
+ super.unparse(writer, leftPrec, rightPrec);
+
+ if (columns != null) {
+ writer.keyword("COLUMNS");
+ SqlHandlerUtil.unparseSqlNodeList(writer, leftPrec, rightPrec,
columns);
+ }
+
+ if (properties != null) {
+ writer.keyword("PROPERTIES");
+ SqlHandlerUtil.unparseSqlNodeList(writer, leftPrec, rightPrec,
properties);
+ }
+ }
+
+ @Override
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new SchemaHandler.Remove(config);
+ }
+
+ public List<String> getColumns() {
+ if (columns == null) {
+ return null;
+ }
+ return columns.getList().stream()
+ .map(SqlNode::toString)
+ .collect(Collectors.toList());
+ }
+
+ public boolean hasProperties() {
+ return properties != null;
+ }
+
+ public List<String> getProperties() {
+ if (properties == null) {
+ return null;
+ }
+ return properties.getList().stream()
+ .map(property -> property.accept(LiteralVisitor.INSTANCE))
+ .collect(Collectors.toList());
+ }
+ }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java
index a24f5a2..8678028 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.record.metadata.schema;
-import org.apache.drill.exec.store.StorageStrategy;
-
import java.io.IOException;
import java.util.Map;
@@ -39,7 +37,7 @@ public class InlineSchemaProvider implements SchemaProvider {
}
@Override
- public void store(String schema, Map<String, String> properties,
StorageStrategy storageStrategy) {
+ public void store(String schema, Map<String, String> properties,
StorageProperties storageProperties) {
throw new UnsupportedOperationException("Schema storage is not supported");
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java
index 8a8933a..fe2d920 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
-import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -103,13 +102,13 @@ public class PathSchemaProvider implements SchemaProvider
{
}
@Override
- public void store(String schema, Map<String, String> properties,
StorageStrategy storageStrategy) throws IOException {
+ public void store(String schema, Map<String, String> properties,
StorageProperties storageProperties) throws IOException {
SchemaContainer tableSchema = createTableSchema(schema, properties);
- try (OutputStream stream = fs.create(path, false)) {
+ try (OutputStream stream = fs.create(path,
storageProperties.isOverwrite())) {
WRITER.writeValue(stream, tableSchema);
}
- storageStrategy.applyToFile(fs, path);
+ storageProperties.getStorageStrategy().applyToFile(fs, path);
}
@Override
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProvider.java
index 343e0ed..66b5c56 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProvider.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProvider.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.record.metadata.schema;
-import org.apache.drill.exec.store.StorageStrategy;
-
import java.io.IOException;
import java.util.Map;
@@ -40,13 +38,12 @@ public interface SchemaProvider {
/**
* Stores given schema definition and properties.
- * If schema is stored in a file, will apply certain permission using {@link
StorageStrategy}.
*
* @param schema schema definition
* @param properties map of properties
- * @param storageStrategy storage strategy
+ * @param storageProperties storage properties
*/
- void store(String schema, Map<String, String> properties, StorageStrategy
storageStrategy) throws IOException;
+ void store(String schema, Map<String, String> properties, StorageProperties
storageProperties) throws IOException;
/**
* Reads schema into {@link SchemaContainer}. Depending on implementation,
can read from a file
@@ -62,5 +59,4 @@ public interface SchemaProvider {
* @return true if schema exists, false otherwise
*/
boolean exists() throws IOException;
-
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/StorageProperties.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/StorageProperties.java
new file mode 100644
index 0000000..aa8b4d6
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/StorageProperties.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record.metadata.schema;
+
+import org.apache.drill.exec.store.StorageStrategy;
+
+/**
+ * Holds storage properties used when writing schema container.
+ */
+public class StorageProperties {
+
+ private final StorageStrategy storageStrategy;
+ private final boolean overwrite;
+
+ private StorageProperties(Builder builder) {
+ this.storageStrategy = builder.storageStrategy;
+ this.overwrite = builder.overwrite;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public StorageStrategy getStorageStrategy() {
+ return storageStrategy;
+ }
+
+ public boolean isOverwrite() {
+ return overwrite;
+ }
+
+ public static class Builder {
+
+ private StorageStrategy storageStrategy = StorageStrategy.DEFAULT;
+ private boolean overwrite;
+
+ public Builder storageStrategy(StorageStrategy storageStrategy) {
+ this.storageStrategy = storageStrategy;
+ return this;
+ }
+
+ public Builder overwrite() {
+ this.overwrite = true;
+ return this;
+ }
+
+ public Builder overwrite(boolean overwrite) {
+ this.overwrite = overwrite;
+ return this;
+ }
+
+ public StorageProperties build() {
+ return new StorageProperties(this);
+ }
+ }
+}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
index f4965ca..c83850e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
@@ -19,6 +19,7 @@ package org.apache.drill;
import org.apache.commons.io.FileUtils;
import org.apache.drill.categories.SqlTest;
+import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.types.TypeProtos;
@@ -42,6 +43,7 @@ import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -51,7 +53,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-@Category(SqlTest.class)
+@Category({SqlTest.class, UnlikelyTest.class})
public class TestSchemaCommands extends ClusterTest {
@Rule
@@ -751,4 +753,415 @@ public class TestSchemaCommands extends ClusterTest {
run("drop table if exists %s", table);
}
}
+
+ @Test
+ public void testAlterAddAbsentKeywords() throws Exception {
+ thrown.expect(UserException.class);
+ thrown.expectMessage("PARSE ERROR");
+ run("alter schema for table abc add");
+ }
+
+ @Test
+ public void testAlterAddAbsentSchemaForTable() throws Exception {
+ String tableName = "table_alter_schema_add_absent_schema";
+ String table = String.format("dfs.tmp.%s", tableName);
+ try {
+ run("create table %s as select 'a' as c from (values(1))", table);
+
+ thrown.expect(UserException.class);
+ thrown.expectMessage("RESOURCE ERROR: Schema does not exist");
+
+ run("alter schema for table %s add columns (col int)", table);
+ } finally {
+ run("drop table if exists %s", table);
+ }
+ }
+
+ @Test
+ public void testAlterAddAbsentSchemaPath() throws Exception {
+ thrown.expect(UserException.class);
+ thrown.expectMessage("RESOURCE ERROR: Schema does not exist");
+
+ run("alter schema path '%s' add columns (col int)",
+ new File(dirTestWatcher.getTmpDir(), "absent.schema").getPath());
+ }
+
+ @Test
+ public void testAlterAddDuplicateColumn() throws Exception {
+ File tmpDir = dirTestWatcher.getTmpDir();
+ File schemaFile = new File(tmpDir, "schema_for_duplicate_column.schema");
+ assertFalse(schemaFile.exists());
+ try {
+ run("create schema (col int) path '%s'", schemaFile.getPath());
+
+ thrown.expect(UserException.class);
+ thrown.expectMessage("VALIDATION ERROR");
+
+ run("alter schema path '%s' add columns (col varchar)",
schemaFile.getPath());
+
+ } finally {
+ FileUtils.deleteQuietly(schemaFile);
+ }
+ }
+
+ @Test
+ public void testAlterAddDuplicateProperty() throws Exception {
+ File tmpDir = dirTestWatcher.getTmpDir();
+ File schemaFile = new File(tmpDir, "schema_for_duplicate_property.schema");
+ assertFalse(schemaFile.exists());
+ try {
+ run("create schema (col int) path '%s' properties ('prop' = 'a')",
schemaFile.getPath());
+
+ thrown.expect(UserException.class);
+ thrown.expectMessage("VALIDATION ERROR");
+
+ run("alter schema path '%s' add properties ('prop' = 'b')",
schemaFile.getPath());
+
+ } finally {
+ FileUtils.deleteQuietly(schemaFile);
+ }
+ }
+
+ @Test
+ public void testAlterAddColumns() throws Exception {
+ File tmpDir = dirTestWatcher.getTmpDir();
+ File schemaFile = new File(tmpDir, "alter_schema_add_columns.schema");
+ assertFalse(schemaFile.exists());
+ try {
+ run("create schema (col1 int) path '%s' properties ('prop1' = 'a')",
schemaFile.getPath());
+
+ testBuilder()
+ .sqlQuery("alter schema path '%s' add " +
+ "columns (col2 varchar) ", schemaFile.getPath())
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Schema for [%s] was updated",
schemaFile.getPath()))
+ .go();
+
+ SchemaProvider schemaProvider = new PathSchemaProvider(new
Path(schemaFile.getPath()));
+ TupleMetadata schema = schemaProvider.read().getSchema();
+
+ assertEquals(2, schema.size());
+
+ assertEquals("col1", schema.fullName(0));
+ assertEquals("col2", schema.fullName(1));
+
+ assertEquals(1, schema.properties().size());
+
+ } finally {
+ FileUtils.deleteQuietly(schemaFile);
+ }
+ }
+
+ @Test
+ public void testAlterAddProperties() throws Exception {
+ File tmpDir = dirTestWatcher.getTmpDir();
+ File schemaFile = new File(tmpDir, "alter_schema_add_properties.schema");
+ assertFalse(schemaFile.exists());
+ try {
+ run("create schema (col1 int) path '%s' properties ('prop1' = 'a')",
schemaFile.getPath());
+
+ testBuilder()
+ .sqlQuery("alter schema path '%s' add " +
+ "properties ('prop2' = 'b', 'prop3' = 'c')", schemaFile.getPath())
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Schema for [%s] was updated",
schemaFile.getPath()))
+ .go();
+
+ SchemaProvider schemaProvider = new PathSchemaProvider(new
Path(schemaFile.getPath()));
+ TupleMetadata schema = schemaProvider.read().getSchema();
+
+ assertEquals(1, schema.size());
+
+ Map<String, String> expectedProperties = new HashMap<>();
+ expectedProperties.put("prop1", "a");
+ expectedProperties.put("prop2", "b");
+ expectedProperties.put("prop3", "c");
+
+ assertEquals(expectedProperties, schema.properties());
+
+ } finally {
+ FileUtils.deleteQuietly(schemaFile);
+ }
+ }
+
+ @Test
+ public void testAlterAddSuccess() throws Exception {
+ File tmpDir = dirTestWatcher.getTmpDir();
+ File schemaFile = new File(tmpDir, "alter_schema_add_success.schema");
+ assertFalse(schemaFile.exists());
+ try {
+ run("create schema (col1 int) path '%s' properties ('prop1' = 'a')",
schemaFile.getPath());
+
+ testBuilder()
+ .sqlQuery("alter schema path '%s' add " +
+ "columns (col2 varchar, col3 boolean) " +
+ "properties ('prop2' = 'b', 'prop3' = 'c')", schemaFile.getPath())
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Schema for [%s] was updated",
schemaFile.getPath()))
+ .go();
+
+ SchemaProvider schemaProvider = new PathSchemaProvider(new
Path(schemaFile.getPath()));
+ TupleMetadata schema = schemaProvider.read().getSchema();
+
+ assertEquals(3, schema.size());
+
+ assertEquals("col1", schema.fullName(0));
+ assertEquals("col2", schema.fullName(1));
+ assertEquals("col3", schema.fullName(2));
+
+ Map<String, String> expectedProperties = new HashMap<>();
+ expectedProperties.put("prop1", "a");
+ expectedProperties.put("prop2", "b");
+ expectedProperties.put("prop3", "c");
+
+ assertEquals(expectedProperties, schema.properties());
+
+ } finally {
+ FileUtils.deleteQuietly(schemaFile);
+ }
+ }
+
+ @Test
+ public void testAlterAddForTable() throws Exception {
+ String tableName = "table_for_alter_add";
+ String table = String.format("dfs.tmp.%s", tableName);
+ try {
+ run("create table %s as select 'a' as c from (values(1))", table);
+
+ File schemaPath = Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(),
+ tableName, SchemaProvider.DEFAULT_SCHEMA_NAME).toFile();
+
+ run("create schema (col int) for table %s properties ('prop1' = 'a')",
table);
+
+ testBuilder()
+ .sqlQuery("alter schema for table %s add or replace " +
+ "columns (col2 varchar, col3 boolean) " +
+ "properties ('prop2' = 'd', 'prop3' = 'c')", table)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Schema for [%s] was updated",
table))
+ .go();
+
+ SchemaProvider schemaProvider = new PathSchemaProvider(new
Path(schemaPath.getPath()));
+ assertTrue(schemaProvider.exists());
+
+ TupleMetadata schema = schemaProvider.read().getSchema();
+ assertEquals(3, schema.size());
+ assertEquals(3, schema.properties().size());
+
+ } finally {
+ run("drop table if exists %s", table);
+ }
+ }
+
+ @Test
+ public void testAlterReplace() throws Exception {
+ File tmpDir = dirTestWatcher.getTmpDir();
+ File schemaFile = new File(tmpDir, "alter_schema_replace_success.schema");
+ assertFalse(schemaFile.exists());
+ try {
+ run("create schema (col1 int, col2 int) path '%s' " +
+ "properties ('prop1' = 'a', 'prop2' = 'b')", schemaFile.getPath());
+
+ testBuilder()
+ .sqlQuery("alter schema path '%s' add or replace " +
+ "columns (col2 varchar, col3 boolean) " +
+ "properties ('prop2' = 'd', 'prop3' = 'c')", schemaFile.getPath())
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Schema for [%s] was updated",
schemaFile.getPath()))
+ .go();
+
+ SchemaProvider schemaProvider = new PathSchemaProvider(new
Path(schemaFile.getPath()));
+ TupleMetadata schema = schemaProvider.read().getSchema();
+
+ assertEquals(3, schema.size());
+
+ assertEquals("col1", schema.fullName(0));
+ assertEquals("col2", schema.fullName(1));
+ assertEquals(TypeProtos.MinorType.VARCHAR,
schema.metadata("col2").type());
+ assertEquals("col3", schema.fullName(2));
+
+ Map<String, String> expectedProperties = new HashMap<>();
+ expectedProperties.put("prop1", "a");
+ expectedProperties.put("prop2", "d");
+ expectedProperties.put("prop3", "c");
+
+ assertEquals(expectedProperties, schema.properties());
+
+ } finally {
+ FileUtils.deleteQuietly(schemaFile);
+ }
+ }
+
+ @Test
+ public void testAlterRemoveAbsentKeywords() throws Exception {
+ thrown.expect(UserException.class);
+ thrown.expectMessage("PARSE ERROR");
+ run("alter schema for table abc remove");
+ }
+
+ @Test
+ public void testAlterRemoveAbsentSchemaForTable() throws Exception {
+ String tableName = "table_alter_schema_remove_absent_schema";
+ String table = String.format("dfs.tmp.%s", tableName);
+ try {
+ run("create table %s as select 'a' as c from (values(1))", table);
+
+ thrown.expect(UserException.class);
+ thrown.expectMessage("RESOURCE ERROR: Schema does not exist");
+
+ run("alter schema for table %s remove columns (col)", table);
+ } finally {
+ run("drop table if exists %s", table);
+ }
+ }
+
+ @Test
+ public void testAlterRemoveAbsentSchemaPath() throws Exception {
+ thrown.expect(UserException.class);
+ thrown.expectMessage("RESOURCE ERROR: Schema does not exist");
+
+ run("alter schema path '%s' remove columns (col)",
+ new File(dirTestWatcher.getTmpDir(), "absent.schema").getPath());
+ }
+
+ @Test
+ public void testAlterRemoveColumns() throws Exception {
+ File tmpDir = dirTestWatcher.getTmpDir();
+ File schemaFile = new File(tmpDir, "alter_schema_remove_columns.schema");
+ assertFalse(schemaFile.exists());
+ try {
+ run("create schema (col1 int, col2 varchar, col3 boolean, col4 int) path
'%s' " +
+ "properties ('prop1' = 'a', 'prop2' = 'b', 'prop3' = 'c', 'prop4' =
'd')", schemaFile.getPath());
+
+ testBuilder()
+ .sqlQuery("alter schema path '%s' remove " +
+ "columns (col2, col4) ", schemaFile.getPath())
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Schema for [%s] was updated",
schemaFile.getPath()))
+ .go();
+
+ SchemaProvider schemaProvider = new PathSchemaProvider(new
Path(schemaFile.getPath()));
+ TupleMetadata schema = schemaProvider.read().getSchema();
+
+ assertEquals(2, schema.size());
+
+ assertEquals("col1", schema.fullName(0));
+ assertEquals("col3", schema.fullName(1));
+
+ assertEquals(4, schema.properties().size());
+
+ } finally {
+ FileUtils.deleteQuietly(schemaFile);
+ }
+ }
+
+ @Test
+ public void testAlterRemoveProperties() throws Exception {
+ File tmpDir = dirTestWatcher.getTmpDir();
+ File schemaFile = new File(tmpDir, "alter_schema_remove_success.schema");
+ assertFalse(schemaFile.exists());
+ try {
+ run("create schema (col1 int, col2 varchar, col3 boolean, col4 int) path
'%s' " +
+ "properties ('prop1' = 'a', 'prop2' = 'b', 'prop3' = 'c', 'prop4' =
'd')", schemaFile.getPath());
+
+ testBuilder()
+ .sqlQuery("alter schema path '%s' remove " +
+ "properties ('prop2', 'prop4')", schemaFile.getPath())
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Schema for [%s] was updated",
schemaFile.getPath()))
+ .go();
+
+ SchemaProvider schemaProvider = new PathSchemaProvider(new
Path(schemaFile.getPath()));
+ TupleMetadata schema = schemaProvider.read().getSchema();
+
+ assertEquals(4, schema.size());
+
+ Map<String, String> expectedProperties = new HashMap<>();
+ expectedProperties.put("prop1", "a");
+ expectedProperties.put("prop3", "c");
+
+ assertEquals(expectedProperties, schema.properties());
+
+ } finally {
+ FileUtils.deleteQuietly(schemaFile);
+ }
+ }
+
+ @Test
+ public void testAlterRemoveSuccess() throws Exception {
+ File tmpDir = dirTestWatcher.getTmpDir();
+ File schemaFile = new File(tmpDir, "alter_schema_remove_success.schema");
+ assertFalse(schemaFile.exists());
+ try {
+ run("create schema (col1 int, col2 varchar, col3 boolean, col4 int) path
'%s' " +
+ "properties ('prop1' = 'a', 'prop2' = 'b', 'prop3' = 'c', 'prop4' =
'd')", schemaFile.getPath());
+
+ testBuilder()
+ .sqlQuery("alter schema path '%s' remove " +
+ "columns (col2, col4) " +
+ "properties ('prop2', 'prop4')", schemaFile.getPath())
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Schema for [%s] was updated",
schemaFile.getPath()))
+ .go();
+
+ SchemaProvider schemaProvider = new PathSchemaProvider(new
Path(schemaFile.getPath()));
+ TupleMetadata schema = schemaProvider.read().getSchema();
+
+ assertEquals(2, schema.size());
+
+ assertEquals("col1", schema.fullName(0));
+ assertEquals("col3", schema.fullName(1));
+
+ Map<String, String> expectedProperties = new HashMap<>();
+ expectedProperties.put("prop1", "a");
+ expectedProperties.put("prop3", "c");
+
+ assertEquals(expectedProperties, schema.properties());
+
+ } finally {
+ FileUtils.deleteQuietly(schemaFile);
+ }
+ }
+
+ @Test
+ public void testAlterRemoveForTable() throws Exception {
+ String tableName = "table_for_alter_add";
+ String table = String.format("dfs.tmp.%s", tableName);
+ try {
+ run("create table %s as select 'a' as c from (values(1))", table);
+
+ File schemaPath = Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(),
+ tableName, SchemaProvider.DEFAULT_SCHEMA_NAME).toFile();
+
+ run("create schema (col1 int, col2 varchar, col3 boolean, col4 int) for
table %s " +
+ "properties ('prop1' = 'a', 'prop2' = 'b', 'prop3' = 'c', 'prop4' =
'd')", table);
+
+ testBuilder()
+ .sqlQuery("alter schema for table %s remove " +
+ "columns (col2, col4) " +
+ "properties ('prop2', 'prop4')", table)
+ .unOrdered()
+ .baselineColumns("ok", "summary")
+ .baselineValues(true, String.format("Schema for [%s] was updated",
table))
+ .go();
+
+ SchemaProvider schemaProvider = new PathSchemaProvider(new
Path(schemaPath.getPath()));
+ assertTrue(schemaProvider.exists());
+
+ TupleMetadata schema = schemaProvider.read().getSchema();
+ assertEquals(2, schema.size());
+ assertEquals(2, schema.properties().size());
+
+ } finally {
+ run("drop table if exists %s", table);
+ }
+ }
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
index 47dffac..6502d83 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
@@ -869,4 +869,68 @@ public class TestTupleSchema extends SubOperatorTest {
assertNull(TupleMetadata.of(""));
assertNull(TupleMetadata.of(" "));
}
+
+ @Test
+ public void testCopy() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.BIGINT)
+ .build();
+
+ schema.setIntProperty("int_prop", 1);
+ schema.setProperty("string_prop", "A");
+
+ TupleMetadata copy = schema.copy();
+
+ assertTrue(schema.isEquivalent(copy));
+ assertEquals(schema.properties(), copy.properties());
+ }
+
+ @Test
+ public void testAddNewColumn() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.BIGINT)
+ .build();
+
+ int index = schema.addColumn(
+ MetadataUtils.newScalar("b",
+ MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.OPTIONAL).build()));
+
+ assertEquals(1, index);
+ assertEquals(2, schema.size());
+ }
+
+ @Test
+ public void testAddNewProperty() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.BIGINT)
+ .build();
+
+ assertTrue(schema.properties().isEmpty());
+
+ schema.setIntProperty("int_prop", 1);
+ schema.setProperty("string_prop", "A");
+
+ assertEquals(2, schema.properties().size());
+ }
+
+ @Test
+ public void testRemoveProperty() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.BIGINT)
+ .build();
+
+ schema.setIntProperty("int_prop", 1);
+ schema.setProperty("string_prop", "A");
+ assertEquals(2, schema.properties().size());
+
+ schema.removeProperty("int_prop");
+ assertEquals(1, schema.properties().size());
+ assertNull(schema.property("int_prop"));
+ assertEquals("A", schema.property("string_prop"));
+
+ schema.removeProperty("missing_prop");
+ assertEquals(1, schema.properties().size());
+ }
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
index 03dcc3c..c07610d 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.record.metadata.schema;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.store.StorageStrategy;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -69,7 +68,7 @@ public class TestSchemaProvider {
SchemaProvider provider = new InlineSchemaProvider("(i int)");
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage("Schema storage is not supported");
- provider.store("i int", null, StorageStrategy.DEFAULT);
+ provider.store("i int", null, StorageProperties.builder().build());
}
@Test
@@ -142,7 +141,7 @@ public class TestSchemaProvider {
properties.put("k2", "v2");
assertFalse(provider.exists());
- provider.store("i int, v varchar(10), s struct<s1 int, s2 varchar>",
properties, StorageStrategy.DEFAULT);
+ provider.store("i int, v varchar(10), s struct<s1 int, s2 varchar>",
properties, StorageProperties.builder().build());
assertTrue(provider.exists());
String expectedContent = "{\n"
@@ -186,7 +185,23 @@ public class TestSchemaProvider {
thrown.expect(IOException.class);
thrown.expectMessage("File already exists");
- provider.store("i int", null, StorageStrategy.DEFAULT);
+ provider.store("i int", null, StorageProperties.builder().build());
+ }
+
+ @Test
+ public void testPathProviderStoreInExistingFileOverwrite() throws Exception {
+ File schemaFile = folder.newFile("schema");
+ org.apache.hadoop.fs.Path schema = new
org.apache.hadoop.fs.Path(schemaFile.getPath());
+ SchemaProvider provider = new PathSchemaProvider(schema);
+ assertTrue(provider.exists());
+
+ StorageProperties storageProperties = StorageProperties.builder()
+ .overwrite()
+ .build();
+ provider.store("i int", null, storageProperties);
+
+ TupleMetadata metadata = provider.read().getSchema();
+ assertEquals(1, metadata.size());
}
@Test
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
index 69926c4..6f7fadc 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
@@ -104,6 +104,7 @@ public abstract class AbstractColumnMetadata extends
AbstractPropertied implemen
mode = from.mode;
precision = from.precision;
scale = from.scale;
+ setProperties(from.properties());
}
@Override
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
index b86b548..3e10a1f 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
@@ -111,4 +111,9 @@ public class AbstractPropertied implements Propertied {
public void setIntProperty(String key, int value) {
setProperty(key, Integer.toString(value));
}
+
+ @Override
+ public void removeProperty(String key) {
+ setProperty(key, null);
+ }
}
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
index 1597ab1..5ccbc62 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
@@ -28,9 +28,9 @@ public interface Propertied {
/**
* Base name for properties which Drill itself defines. Provides a
* separate "name space" from user-defined properties which should
- * have some other perfix.
+ * have some other prefix.
*/
- public static final String DRILL_PROP_PREFIX = "drill.";
+ String DRILL_PROP_PREFIX = "drill.";
/**
* Sets schema properties if not null.
@@ -50,6 +50,8 @@ public interface Propertied {
int intProperty(String key);
int intProperty(String key, int defaultValue);
void setIntProperty(String key, int value);
+ void removeProperty(String key);
+
/**
* Drill-wide properties are of the form:<br><tt>
@@ -65,7 +67,7 @@ public interface Propertied {
* @return the "drill.<plugin name>." prefix
*/
- public static String pluginPrefix(String pluginName) {
+ static String pluginPrefix(String pluginName) {
return DRILL_PROP_PREFIX + pluginName + ".";
}
}
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
index 97304c5..cb43c64 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
@@ -71,6 +71,7 @@ public class TupleSchema extends AbstractPropertied
implements TupleMetadata {
for (ColumnMetadata md : this) {
tuple.addColumn(md.copy());
}
+ tuple.setProperties(properties());
return tuple;
}