Repository: tajo Updated Branches: refs/heads/branch-0.11.0 a9bf62008 -> fcc0c030c
TAJO-1673: Implement recover partitions. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fcc0c030 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fcc0c030 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fcc0c030 Branch: refs/heads/branch-0.11.0 Commit: fcc0c030c5fcd3f72d46b225decd75ee668bfd7c Parents: a9bf620 Author: JaeHwa Jung <[email protected]> Authored: Thu Sep 24 14:45:01 2015 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Thu Sep 24 14:45:01 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../apache/tajo/algebra/AlterTableOpType.java | 2 +- .../tajo/engine/planner/TestLogicalPlanner.java | 19 +++ .../tajo/engine/query/TestAlterTable.java | 95 +++++++++++++ .../alter_table_drop_partition1.sql | 2 +- .../alter_table_drop_partition2.sql | 2 +- .../create_partitioned_table2.sql | 2 + .../alter_table_repair_partition_1.sql | 1 + .../alter_table_repair_partition_1.result | 8 ++ .../apache/tajo/master/exec/DDLExecutor.java | 138 +++++++++++++++++-- .../org/apache/tajo/parser/sql/SQLAnalyzer.java | 6 + .../main/sphinx/sql_language/alter_table.rst | 19 ++- .../rewrite/rules/PartitionedTableRewriter.java | 4 +- .../plan/serder/LogicalNodeDeserializer.java | 3 + .../tajo/plan/serder/LogicalNodeSerializer.java | 4 + tajo-plan/src/main/proto/Plan.proto | 1 + .../org/apache/tajo/parser/sql/SQLLexer.g4 | 1 + .../org/apache/tajo/parser/sql/SQLParser.g4 | 2 + 18 files changed, 296 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index f2f1352..8bc7409 100644 --- a/CHANGES +++ b/CHANGES @@ -572,6 +572,8 @@ Release 0.11.0 - unreleased TASKS + TAJO-1673: Implement recover partitions. (jaehwa) + TAJO-1872: Increase the minimum split size and add a classpath to hadoop tools. (jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java ---------------------------------------------------------------------- diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java index 679ab4b..89daef0 100644 --- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java +++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java @@ -18,5 +18,5 @@ package org.apache.tajo.algebra; public enum AlterTableOpType { - RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN, ADD_PARTITION, DROP_PARTITION, SET_PROPERTY + RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN, ADD_PARTITION, DROP_PARTITION, SET_PROPERTY, REPAIR_PARTITION } http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index 38d02aa..6e61657 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -1261,6 +1261,25 @@ public class TestLogicalPlanner { return root.getChild(); } + @Test + public final void testAlterTableRepairPartiton() throws TajoException { + QueryContext qc = createQueryContext(); + + String sql = "ALTER TABLE table1 REPAIR PARTITION"; + Expr expr = sqlAnalyzer.parse(sql); + LogicalPlan rootNode = planner.createPlan(qc, expr); + LogicalNode plan = rootNode.getRootBlock().getRoot(); + testJsonSerDerObject(plan); + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + assertEquals(NodeType.ALTER_TABLE, root.getChild().getType()); + + AlterTableNode msckNode = root.getChild(); + + assertEquals(msckNode.getAlterTableOpType(), AlterTableOpType.REPAIR_PARTITION); + assertEquals(msckNode.getTableName(), "table1"); + } + String [] ALTER_PARTITIONS = { "ALTER TABLE partitioned_table ADD PARTITION (col1 = 1 , col2 = 2) LOCATION 'hdfs://xxx" + ".com/warehouse/partitioned_table/col1=1/col2=2'", //0 http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java index 8339ea7..d10c0f2 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java @@ -25,6 +25,10 @@ import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.exception.UndefinedDatabaseException; +import org.apache.tajo.exception.UndefinedPartitionException; +import org.apache.tajo.exception.UndefinedPartitionMethodException; +import org.apache.tajo.exception.UndefinedTableException; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -110,5 +114,96 @@ public class TestAlterTable extends QueryTestCaseBase { assertNotNull(partitions); assertEquals(partitions.size(), 0); assertFalse(fs.exists(partitionPath)); + + catalog.dropTable(tableName); + } + + @Test + public final void testAlterTableRepairPartition() throws Exception { + executeDDL("create_partitioned_table2.sql", null); + + String simpleTableName = "partitioned_table2"; + String tableName = CatalogUtil.buildFQName(getCurrentDatabase(), simpleTableName); + assertTrue(catalog.existsTable(tableName)); + + TableDesc tableDesc = catalog.getTableDesc(tableName); + assertEquals(tableDesc.getName(), tableName); + assertEquals(tableDesc.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.COLUMN); + assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns().size(), 2); + assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getColumn(0).getSimpleName(), "col1"); + assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getColumn(1).getSimpleName(), "col2"); + + ResultSet res = executeString( + "insert overwrite into " + simpleTableName + " select l_quantity, l_returnflag, l_orderkey, l_partkey " + + " from default.lineitem"); + res.close(); + + res = executeString("select * from " + simpleTableName + " order by col1, col2, col3, col4"); + String result = resultSetToString(res); + String expectedResult = "col3,col4,col1,col2\n" + + "-------------------------------\n" + + "17.0,N,1,1\n" + + "36.0,N,1,1\n" + + "38.0,N,2,2\n" + + "45.0,R,3,2\n" + + "49.0,R,3,3\n"; + + res.close(); + assertEquals(expectedResult, result); + + verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4); + + Path tablePath = new Path(tableDesc.getUri()); + FileSystem fs = tablePath.getFileSystem(conf); + assertTrue(fs.exists(new Path(tableDesc.getUri()))); + assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=1/col2=1"))); + assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=2/col2=2"))); + assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=2"))); + assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=3"))); + + // Remove all partitions + executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 1 , col2 = 1)").close(); + executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 2 , col2 = 2)").close(); + executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 3 , col2 = 2)").close(); + executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 3 , col2 = 3)").close(); + + verifyPartitionCount(getCurrentDatabase(), simpleTableName, 0); + + assertTrue(fs.exists(new Path(tableDesc.getUri()))); + assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=1/col2=1"))); + assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=2/col2=2"))); + assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=2"))); + assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=3"))); + + executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close(); + verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4); + + // Remove just one of existing partitions + executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 3 , col2 = 3)").close(); + executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close(); + verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4); + + // Remove a partition directory from filesystem + fs.delete(new Path(tablePath.toUri() + "/col1=3/col2=3"), true); + executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close(); + verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4); + + // Add abnormal directories + assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/col10=1/col20=1"))); + assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/col1="))); + assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/test"))); + assertEquals(6, fs.listStatus(new Path(tablePath.toUri())).length); + + executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close(); + verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4); + catalog.dropTable(tableName); + } + + private void verifyPartitionCount(String databaseName, String tableName, int expectedCount) + throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + UndefinedPartitionException { + List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitions(databaseName, tableName); + assertNotNull(partitions); + assertEquals(partitions.size(), expectedCount); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql index b5d672f..cc4d6dd 100644 --- a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql +++ b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql @@ -1 +1 @@ -ALTER TABLE partitioned_table DROP PARTITION (col3 = 1 , col4 = 2) \ No newline at end of file +ALTER TABLE partitioned_table DROP PARTITION (col3 = 1 , col4 = 2) PURGE \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql index 0d4c932..452164b 100644 --- a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql +++ b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql @@ -1 +1 @@ -ALTER TABLE partitioned_table DROP IF EXISTS PARTITION (col3 = 1 , col4 = 2) \ No newline at end of file +ALTER TABLE partitioned_table DROP IF EXISTS PARTITION (col3 = 1 , col4 = 2) PURGE \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql b/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql new file mode 100644 index 0000000..0fc8094 --- /dev/null +++ b/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql @@ -0,0 +1,2 @@ +create table partitioned_table2 (col3 float8, col4 text) USING text WITH ('text.delimiter'='|') +PARTITION by column(col1 int4, col2 int4) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql b/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql new file mode 100644 index 0000000..b65b0e6 --- /dev/null +++ b/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql @@ -0,0 +1 @@ +ALTER TABLE table1 REPAIR PARTITION \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result b/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result new file mode 100644 index 0000000..daca3b3 --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result @@ -0,0 +1,8 @@ +{ + "OldTableName": "table1", + "AlterTableType": "REPAIR_PARTITION", + "IsPurge": false, + "IfNotExists": false, + "IfExists": false, + "OpType": "AlterTable" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index 15abf9e..a67a625 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -23,25 +23,31 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.tajo.algebra.AlterTableOpType; import org.apache.tajo.algebra.AlterTablespaceSetType; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionKeyProto; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.*; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.FileTablespace; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.Tablespace; import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.Pair; +import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.TUtil; import java.io.IOException; import java.util.ArrayList; @@ -529,24 +535,138 @@ public class DDLExecutor { catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(), alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.DROP_PARTITION)); - // When dropping partition on an managed table, the data will be delete from file system. - if (!desc.isExternal()) { + // When dropping a partition on a table, its data will NOT be deleted if the 'PURGE' option is not specified. + if (alterTable.isPurge()) { deletePartitionPath(partitionDescProto); - } else { - // When dropping partition on an external table, the data in the table will NOT be deleted from the file - // system. But if PURGE is specified, the partition data will be deleted. - if (alterTable.isPurge()) { - deletePartitionPath(partitionDescProto); - } } } - + break; + case REPAIR_PARTITION: + repairPartition(context, queryContext, alterTable); break; default: throw new InternalError("alterTable cannot handle such query: \n" + alterTable.toJson()); } } + /** + * Run ALTER TABLE table_name REPAIR TABLE statement. + * This will recovery all partitions which exists on table directory. + * + * + * @param context + * @param queryContext + * @param alterTable + * @throws IOException + */ + public void repairPartition(TajoMaster.MasterContext context, final QueryContext queryContext, + final AlterTableNode alterTable) throws IOException, TajoException { + final CatalogService catalog = context.getCatalog(); + final String tableName = alterTable.getTableName(); + + String databaseName; + String simpleTableName; + if (CatalogUtil.isFQTableName(tableName)) { + String[] split = CatalogUtil.splitFQTableName(tableName); + databaseName = split[0]; + simpleTableName = split[1]; + } else { + databaseName = queryContext.getCurrentDatabase(); + simpleTableName = tableName; + } + + if (!catalog.existsTable(databaseName, simpleTableName)) { + throw new UndefinedTableException(alterTable.getTableName()); + } + + TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName); + + if(tableDesc.getPartitionMethod() == null) { + throw new UndefinedPartitionMethodException(simpleTableName); + } + + Path tablePath = new Path(tableDesc.getUri()); + FileSystem fs = tablePath.getFileSystem(context.getConf()); + + PartitionMethodDesc partitionDesc = tableDesc.getPartitionMethod(); + Schema partitionColumns = partitionDesc.getExpressionSchema(); + + // Get the array of path filter, accepting all partition paths. + PathFilter[] filters = PartitionedTableRewriter.buildAllAcceptingPathFilters(partitionColumns); + + // loop from one to the number of partition columns + Path [] filteredPaths = PartitionedTableRewriter.toPathArray(fs.listStatus(tablePath, filters[0])); + + // Get all file status matched to a ith level path filter. + for (int i = 1; i < partitionColumns.size(); i++) { + filteredPaths = PartitionedTableRewriter.toPathArray(fs.listStatus(filteredPaths, filters[i])); + } + + // Find missing partitions from filesystem + List<PartitionDescProto> existingPartitions = catalog.getPartitions(databaseName, simpleTableName); + List<String> existingPartitionNames = TUtil.newList(); + Path existingPartitionPath = null; + + for(PartitionDescProto existingPartition : existingPartitions) { + existingPartitionPath = new Path(existingPartition.getPath()); + existingPartitionNames.add(existingPartition.getPartitionName()); + if (!fs.exists(existingPartitionPath) && LOG.isDebugEnabled()) { + LOG.debug("Partitions missing from Filesystem:" + existingPartition.getPartitionName()); + } + } + + // Find missing partitions from CatalogStore + List<PartitionDescProto> targetPartitions = TUtil.newList(); + for(Path filteredPath : filteredPaths) { + PartitionDescProto targetPartition = getPartitionDesc(simpleTableName, filteredPath); + if (!existingPartitionNames.contains(targetPartition.getPartitionName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Partitions not in CatalogStore:" + targetPartition.getPartitionName()); + } + targetPartitions.add(targetPartition); + } + } + + catalog.addPartitions(databaseName, simpleTableName, targetPartitions, true); + + if (LOG.isDebugEnabled()) { + for(PartitionDescProto targetPartition: targetPartitions) { + LOG.debug("Repair: Added partition to CatalogStore " + tableName + ":" + targetPartition.getPartitionName()); + } + } + + LOG.info("Total added partitions to CatalogStore: " + targetPartitions.size()); + } + + private PartitionDescProto getPartitionDesc(String tableName, Path path) throws IOException { + String partitionPath = path.toString(); + + String partitionName = StringUtils.unescapePathName(partitionPath); + int startIndex = partitionPath.indexOf(tableName); + partitionName = partitionName.substring(startIndex + tableName.length() + 1, partitionPath.length()); + + CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); + builder.setPartitionName(partitionName); + + String[] partitionKeyPairs = partitionName.split("/"); + + for(int i = 0; i < partitionKeyPairs.length; i++) { + String partitionKeyPair = partitionKeyPairs[i]; + String[] split = partitionKeyPair.split("="); + + PartitionKeyProto.Builder keyBuilder = PartitionKeyProto.newBuilder(); + keyBuilder.setColumnName(split[0]); + keyBuilder.setPartitionValue(split[1]); + + builder.addPartitionKeys(keyBuilder.build()); + } + + builder.setPath(partitionPath); + + return builder.build(); + } + + private void deletePartitionPath(CatalogProtos.PartitionDescProto partitionDescProto) throws IOException { Path partitionPath = new Path(partitionDescProto.getPath()); FileSystem fs = partitionPath.getFileSystem(context.getConf()); http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java index 6190cdc..d5eafb3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java +++ b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java @@ -1947,6 +1947,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { final int PARTITION_MASK = 00000020; final int SET_MASK = 00000002; final int PROPERTY_MASK = 00010000; + final int REPAIR_MASK = 00000003; int val = 00000000; @@ -1978,6 +1979,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { case PROPERTY: val = val | PROPERTY_MASK; break; + case REPAIR: + val = val | REPAIR_MASK; + break; default: break; } @@ -1989,6 +1993,8 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { private AlterTableOpType evaluateAlterTableOperationTye(final int value) { switch (value) { + case 19: + return AlterTableOpType.REPAIR_PARTITION; case 65: return AlterTableOpType.RENAME_TABLE; case 73: http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-docs/src/main/sphinx/sql_language/alter_table.rst ---------------------------------------------------------------------- diff --git a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst index ffc34d1..959ebcc 100644 --- a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst +++ b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst @@ -96,4 +96,21 @@ You can use ``ALTER TABLE ADD PARTITION`` to add partitions to a table. The loca ALTER TABLE table1 DROP PARTITION (col1 = '2015' , col2 = '01', col3 = '11' ) ALTER TABLE table1 DROP PARTITION (col1 = 'TAJO' ) PURGE -You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. This removes the data for a managed table and this doesn't remove the data for an external table. But if ``PURGE`` is specified for an external table, the partition data will be removed. The metadata is completely lost in all cases. An error is thrown if the partition for the table doesn't exists. You can use ``IF EXISTS`` to skip the error. +You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. This doesn't remove the data for a table. But if ``PURGE`` is specified, the partition data will be removed. The metadata is completely lost in all cases. An error is thrown if the partition for the table doesn't exist. You can use ``IF EXISTS`` to skip the error. + +======================== +REPAIR PARTITION +======================== + +Tajo stores a list of partitions for each table in its catalog. If partitions are manually added to the distributed file system, the metastore is not aware of these partitions. Running the ``ALTER TABLE REPAIR PARTITION`` statement ensures that the tables are properly populated. + +*Synopsis* + +.. code-block:: sql + + ALTER TABLE <table_name> REPAIR PARTITION + +.. note:: + + Even though an information of a partition is stored in the catalog, Tajo does not recover it when its partition directory doesn't exist in the file system. + http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index b5cd42b..5123fc4 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -199,7 +199,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { * @param partitionColumns The partition columns schema * @return The array of path filter, accpeting all partition paths. */ - private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) { + public static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) { Column target; PathFilter [] filters = new PathFilter[partitionColumns.size()]; List<EvalNode> accumulatedFilters = Lists.newArrayList(); @@ -214,7 +214,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { return filters; } - private static Path [] toPathArray(FileStatus[] fileStatuses) { + public static Path [] toPathArray(FileStatus[] fileStatuses) { Path [] paths = new Path[fileStatuses.length]; for (int j = 0; j < fileStatuses.length; j++) { paths[j] = fileStatuses[j].getPath(); http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java index 608fa4c..c75c3fd 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -650,6 +650,9 @@ public class LogicalNodeDeserializer { alterTable.setPurge(alterPartition.getPurge()); alterTable.setIfExists(alterPartition.getIfExists()); break; + case REPAIR_PARTITION: + alterTable.setTableName(alterTableProto.getTableName()); + break; default: throw new TajoRuntimeException( new NotImplementedException("Unknown SET type in ALTER TABLE: " + alterTableProto.getSetType().name())); http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java index a0f1fcc..3cf7d9e 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java @@ -633,6 +633,10 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe partitionBuilder.setPurge(node.isPurge()); alterTableBuilder.setAlterPartition(partitionBuilder); break; + case REPAIR_PARTITION: + alterTableBuilder.setSetType(PlanProto.AlterTableNode.Type.REPAIR_PARTITION); + alterTableBuilder.setTableName(node.getTableName()); + break; default: throw new TajoRuntimeException( new NotImplementedException("Unknown SET type in ALTER TABLE: " + node.getAlterTableOpType().name())); http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-plan/src/main/proto/Plan.proto ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index 8a8ecb1..fa1deeb 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -302,6 +302,7 @@ message AlterTableNode { SET_PROPERTY = 3; ADD_PARTITION = 4; DROP_PARTITION = 5; + REPAIR_PARTITION = 6; } message RenameTable { http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4 ---------------------------------------------------------------------- diff --git a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4 b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4 index 896f627..ee61320 100644 --- a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4 +++ b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4 @@ -290,6 +290,7 @@ RANK : R A N K; RECORD : R E C O R D; REGEXP : R E G E X P; RENAME : R E N A M E; +REPAIR : R E P A I R; RESET : R E S E T; RLIKE : R L I K E; ROLLUP : R O L L U P; http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4 ---------------------------------------------------------------------- diff --git a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4 b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4 index c125352..e2693ea 100644 --- a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4 +++ b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4 @@ -308,6 +308,7 @@ nonreserved_keywords | RECORD | REGEXP | RENAME + | REPAIR | RESET | RLIKE | ROLLUP @@ -1624,6 +1625,7 @@ alter_table_statement | ALTER TABLE table_name ADD (if_not_exists)? PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (LOCATION path=Character_String_Literal)? | ALTER TABLE table_name DROP (if_exists)? PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (PURGE)? | ALTER TABLE table_name SET PROPERTY property_list + | ALTER TABLE table_name REPAIR PARTITION ; partition_column_value_list
