Repository: tajo
Updated Branches:
  refs/heads/master 5defb26c4 -> 5af330d22


TAJO-1673: Implement recover partitions.

Closes #626


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5af330d2
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5af330d2
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5af330d2

Branch: refs/heads/master
Commit: 5af330d228991b9a2a7a9ce49979fd6feb84d464
Parents: 5defb26
Author: JaeHwa Jung <[email protected]>
Authored: Thu Sep 24 14:36:33 2015 +0900
Committer: JaeHwa Jung <[email protected]>
Committed: Thu Sep 24 14:36:33 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../apache/tajo/algebra/AlterTableOpType.java   |   2 +-
 .../tajo/engine/planner/TestLogicalPlanner.java |  19 +++
 .../tajo/engine/query/TestAlterTable.java       |  95 +++++++++++++
 .../alter_table_drop_partition1.sql             |   2 +-
 .../alter_table_drop_partition2.sql             |   2 +-
 .../create_partitioned_table2.sql               |   2 +
 .../alter_table_repair_partition_1.sql          |   1 +
 .../alter_table_repair_partition_1.result       |   8 ++
 .../apache/tajo/master/exec/DDLExecutor.java    | 138 +++++++++++++++++--
 .../org/apache/tajo/parser/sql/SQLAnalyzer.java |   6 +
 .../main/sphinx/sql_language/alter_table.rst    |  19 ++-
 .../rewrite/rules/PartitionedTableRewriter.java |   4 +-
 .../plan/serder/LogicalNodeDeserializer.java    |   3 +
 .../tajo/plan/serder/LogicalNodeSerializer.java |   4 +
 tajo-plan/src/main/proto/Plan.proto             |   1 +
 .../org/apache/tajo/parser/sql/SQLLexer.g4      |   1 +
 .../org/apache/tajo/parser/sql/SQLParser.g4     |   2 +
 18 files changed, 296 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ea5f1de..433be87 100644
--- a/CHANGES
+++ b/CHANGES
@@ -681,6 +681,8 @@ Release 0.11.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1673: Implement recover partitions. (jaehwa)
+
     TAJO-1844: Eliminate explicit diamond expressions. 
     (Contributed by Dongkyu Hwangbo, committed by hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
----------------------------------------------------------------------
diff --git 
a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java 
b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
index 679ab4b..89daef0 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
@@ -18,5 +18,5 @@
 package org.apache.tajo.algebra;
 
 public enum AlterTableOpType {
-  RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN, ADD_PARTITION, DROP_PARTITION, 
SET_PROPERTY
+  RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN, ADD_PARTITION, DROP_PARTITION, 
SET_PROPERTY, REPAIR_PARTITION
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index e9220c3..5a0bd94 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -1261,6 +1261,25 @@ public class TestLogicalPlanner {
     return root.getChild();
   }
 
+  @Test
+  public final void testAlterTableRepairPartiton() throws TajoException {
+    QueryContext qc = createQueryContext();
+
+    String sql = "ALTER TABLE table1 REPAIR PARTITION";
+    Expr expr = sqlAnalyzer.parse(sql);
+    LogicalPlan rootNode = planner.createPlan(qc, expr);
+    LogicalNode plan = rootNode.getRootBlock().getRoot();
+    testJsonSerDerObject(plan);
+    assertEquals(NodeType.ROOT, plan.getType());
+    LogicalRootNode root = (LogicalRootNode) plan;
+    assertEquals(NodeType.ALTER_TABLE, root.getChild().getType());
+
+    AlterTableNode msckNode = root.getChild();
+
+    assertEquals(msckNode.getAlterTableOpType(), 
AlterTableOpType.REPAIR_PARTITION);
+    assertEquals(msckNode.getTableName(), "table1");
+  }
+
   String [] ALTER_PARTITIONS = {
     "ALTER TABLE partitioned_table ADD PARTITION (col1 = 1 , col2 = 2) 
LOCATION 'hdfs://xxx" +
       ".com/warehouse/partitioned_table/col1=1/col2=2'", //0

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
index 8339ea7..d10c0f2 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
@@ -25,6 +25,10 @@ import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.exception.UndefinedDatabaseException;
+import org.apache.tajo.exception.UndefinedPartitionException;
+import org.apache.tajo.exception.UndefinedPartitionMethodException;
+import org.apache.tajo.exception.UndefinedTableException;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -110,5 +114,96 @@ public class TestAlterTable extends QueryTestCaseBase {
     assertNotNull(partitions);
     assertEquals(partitions.size(), 0);
     assertFalse(fs.exists(partitionPath));
+
+    catalog.dropTable(tableName);
+  }
+
+  @Test
+  public final void testAlterTableRepairPartition() throws Exception {
+    executeDDL("create_partitioned_table2.sql", null);
+
+    String simpleTableName = "partitioned_table2";
+    String tableName = CatalogUtil.buildFQName(getCurrentDatabase(), 
simpleTableName);
+    assertTrue(catalog.existsTable(tableName));
+
+    TableDesc tableDesc = catalog.getTableDesc(tableName);
+    assertEquals(tableDesc.getName(), tableName);
+    assertEquals(tableDesc.getPartitionMethod().getPartitionType(), 
CatalogProtos.PartitionType.COLUMN);
+    
assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns().size(),
 2);
+    
assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getColumn(0).getSimpleName(),
 "col1");
+    
assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getColumn(1).getSimpleName(),
 "col2");
+
+    ResultSet res = executeString(
+      "insert overwrite into " + simpleTableName + " select l_quantity, 
l_returnflag, l_orderkey, l_partkey " +
+      " from default.lineitem");
+    res.close();
+
+    res = executeString("select * from " + simpleTableName + " order by col1, 
col2, col3, col4");
+    String result = resultSetToString(res);
+    String expectedResult = "col3,col4,col1,col2\n" +
+      "-------------------------------\n" +
+      "17.0,N,1,1\n" +
+      "36.0,N,1,1\n" +
+      "38.0,N,2,2\n" +
+      "45.0,R,3,2\n" +
+      "49.0,R,3,3\n";
+
+    res.close();
+    assertEquals(expectedResult, result);
+
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    Path tablePath = new Path(tableDesc.getUri());
+    FileSystem fs = tablePath.getFileSystem(conf);
+    assertTrue(fs.exists(new Path(tableDesc.getUri())));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=3")));
+
+    // Remove all partitions
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 
1 , col2 = 1)").close();
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 
2 , col2 = 2)").close();
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 
3 , col2 = 2)").close();
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 
3 , col2 = 3)").close();
+
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 0);
+
+    assertTrue(fs.exists(new Path(tableDesc.getUri())));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=3")));
+
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR 
PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    // Remove just one of existing partitions
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 
3 , col2 = 3)").close();
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR 
PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    // Remove a partition directory from filesystem
+    fs.delete(new Path(tablePath.toUri() + "/col1=3/col2=3"), true);
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR 
PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    // Add abnormal directories
+    assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/col10=1/col20=1")));
+    assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/col1=")));
+    assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/test")));
+    assertEquals(6, fs.listStatus(new Path(tablePath.toUri())).length);
+
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR 
PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+    catalog.dropTable(tableName);
+  }
+
+  private void verifyPartitionCount(String databaseName, String tableName, int 
expectedCount)
+    throws UndefinedDatabaseException, UndefinedTableException, 
UndefinedPartitionMethodException,
+    UndefinedPartitionException {
+    List<CatalogProtos.PartitionDescProto> partitions = 
catalog.getPartitions(databaseName, tableName);
+    assertNotNull(partitions);
+    assertEquals(partitions.size(), expectedCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
 
b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
index b5d672f..cc4d6dd 100644
--- 
a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
+++ 
b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
@@ -1 +1 @@
-ALTER TABLE partitioned_table DROP PARTITION (col3 = 1 , col4 = 2)
\ No newline at end of file
+ALTER TABLE partitioned_table DROP PARTITION (col3 = 1 , col4 = 2) PURGE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
 
b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
index 0d4c932..452164b 100644
--- 
a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
+++ 
b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
@@ -1 +1 @@
-ALTER TABLE partitioned_table DROP IF EXISTS PARTITION (col3 = 1 , col4 = 2)
\ No newline at end of file
+ALTER TABLE partitioned_table DROP IF EXISTS PARTITION (col3 = 1 , col4 = 2) 
PURGE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql
 
b/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql
new file mode 100644
index 0000000..0fc8094
--- /dev/null
+++ 
b/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql
@@ -0,0 +1,2 @@
+create table partitioned_table2 (col3 float8, col4 text) USING text  WITH 
('text.delimiter'='|')
+PARTITION by column(col1 int4, col2 int4)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql
 
b/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql
new file mode 100644
index 0000000..b65b0e6
--- /dev/null
+++ 
b/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql
@@ -0,0 +1 @@
+ALTER TABLE table1 REPAIR PARTITION
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result
 
b/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result
new file mode 100644
index 0000000..daca3b3
--- /dev/null
+++ 
b/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result
@@ -0,0 +1,8 @@
+{
+  "OldTableName": "table1",
+  "AlterTableType": "REPAIR_PARTITION",
+  "IsPurge": false,
+  "IfNotExists": false,
+  "IfExists": false,
+  "OpType": "AlterTable"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index 92e1775..dac99e5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -23,25 +23,31 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.tajo.algebra.AlterTableOpType;
 import org.apache.tajo.algebra.AlterTablespaceSetType;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.PartitionKeyProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.*;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.FileTablespace;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -529,24 +535,138 @@ public class DDLExecutor {
         catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, 
alterTable.getPartitionColumns(),
             alterTable.getPartitionValues(), alterTable.getLocation(), 
AlterTableType.DROP_PARTITION));
 
-        // When dropping partition on an managed table, the data will be 
delete from file system.
-        if (!desc.isExternal()) {
+        // When dropping a partition on a table, its data will NOT be deleted 
if the 'PURGE' option is not specified.
+        if (alterTable.isPurge()) {
           deletePartitionPath(partitionDescProto);
-        } else {
-          // When dropping partition on an external table, the data in the 
table will NOT be deleted from the file
-          // system. But if PURGE is specified, the partition data will be 
deleted.
-          if (alterTable.isPurge()) {
-            deletePartitionPath(partitionDescProto);
-          }
         }
       }
-
+      break;
+    case REPAIR_PARTITION:
+      repairPartition(context, queryContext, alterTable);
       break;
     default:
       throw new InternalError("alterTable cannot handle such query: \n" + 
alterTable.toJson());
     }
   }
 
+  /**
+   * Run ALTER TABLE table_name REPAIR TABLE  statement.
+   * This will recovery all partitions which exists on table directory.
+   *
+   *
+   * @param context
+   * @param queryContext
+   * @param alterTable
+   * @throws IOException
+   */
+  public void repairPartition(TajoMaster.MasterContext context, final 
QueryContext queryContext,
+                         final AlterTableNode alterTable) throws IOException, 
TajoException {
+    final CatalogService catalog = context.getCatalog();
+    final String tableName = alterTable.getTableName();
+
+    String databaseName;
+    String simpleTableName;
+    if (CatalogUtil.isFQTableName(tableName)) {
+      String[] split = CatalogUtil.splitFQTableName(tableName);
+      databaseName = split[0];
+      simpleTableName = split[1];
+    } else {
+      databaseName = queryContext.getCurrentDatabase();
+      simpleTableName = tableName;
+    }
+
+    if (!catalog.existsTable(databaseName, simpleTableName)) {
+      throw new UndefinedTableException(alterTable.getTableName());
+    }
+
+    TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName);
+
+    if(tableDesc.getPartitionMethod() == null) {
+      throw new UndefinedPartitionMethodException(simpleTableName);
+    }
+
+    Path tablePath = new Path(tableDesc.getUri());
+    FileSystem fs = tablePath.getFileSystem(context.getConf());
+
+    PartitionMethodDesc partitionDesc = tableDesc.getPartitionMethod();
+    Schema partitionColumns = partitionDesc.getExpressionSchema();
+
+    // Get the array of path filter, accepting all partition paths.
+    PathFilter[] filters = 
PartitionedTableRewriter.buildAllAcceptingPathFilters(partitionColumns);
+
+    // loop from one to the number of partition columns
+    Path [] filteredPaths = 
PartitionedTableRewriter.toPathArray(fs.listStatus(tablePath, filters[0]));
+
+    // Get all file status matched to a ith level path filter.
+    for (int i = 1; i < partitionColumns.size(); i++) {
+      filteredPaths = 
PartitionedTableRewriter.toPathArray(fs.listStatus(filteredPaths, filters[i]));
+    }
+
+    // Find missing partitions from filesystem
+    List<PartitionDescProto> existingPartitions = 
catalog.getPartitions(databaseName, simpleTableName);
+    List<String> existingPartitionNames = TUtil.newList();
+    Path existingPartitionPath = null;
+
+    for(PartitionDescProto existingPartition : existingPartitions) {
+      existingPartitionPath = new Path(existingPartition.getPath());
+      existingPartitionNames.add(existingPartition.getPartitionName());
+      if (!fs.exists(existingPartitionPath) && LOG.isDebugEnabled()) {
+        LOG.debug("Partitions missing from Filesystem:" + 
existingPartition.getPartitionName());
+      }
+    }
+
+    // Find missing partitions from CatalogStore
+    List<PartitionDescProto> targetPartitions = TUtil.newList();
+    for(Path filteredPath : filteredPaths) {
+      PartitionDescProto targetPartition = getPartitionDesc(simpleTableName, 
filteredPath);
+      if 
(!existingPartitionNames.contains(targetPartition.getPartitionName())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Partitions not in CatalogStore:" + 
targetPartition.getPartitionName());
+        }
+        targetPartitions.add(targetPartition);
+      }
+    }
+
+    catalog.addPartitions(databaseName, simpleTableName, targetPartitions, 
true);
+
+    if (LOG.isDebugEnabled()) {
+      for(PartitionDescProto targetPartition: targetPartitions) {
+        LOG.debug("Repair: Added partition to CatalogStore " + tableName + ":" 
+ targetPartition.getPartitionName());
+      }
+    }
+
+    LOG.info("Total added partitions to CatalogStore: " + 
targetPartitions.size());
+  }
+
+  private PartitionDescProto getPartitionDesc(String tableName, Path path) 
throws IOException {
+    String partitionPath = path.toString();
+
+    String partitionName = StringUtils.unescapePathName(partitionPath);
+    int startIndex = partitionPath.indexOf(tableName);
+    partitionName = partitionName.substring(startIndex + tableName.length() + 
1, partitionPath.length());
+
+    CatalogProtos.PartitionDescProto.Builder builder = 
CatalogProtos.PartitionDescProto.newBuilder();
+    builder.setPartitionName(partitionName);
+
+    String[] partitionKeyPairs = partitionName.split("/");
+
+    for(int i = 0; i < partitionKeyPairs.length; i++) {
+      String partitionKeyPair = partitionKeyPairs[i];
+      String[] split = partitionKeyPair.split("=");
+
+      PartitionKeyProto.Builder keyBuilder = PartitionKeyProto.newBuilder();
+      keyBuilder.setColumnName(split[0]);
+      keyBuilder.setPartitionValue(split[1]);
+
+      builder.addPartitionKeys(keyBuilder.build());
+    }
+
+    builder.setPath(partitionPath);
+
+    return builder.build();
+  }
+
+
   private void deletePartitionPath(CatalogProtos.PartitionDescProto 
partitionDescProto) throws IOException {
     Path partitionPath = new Path(partitionDescProto.getPath());
     FileSystem fs = partitionPath.getFileSystem(context.getConf());

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java 
b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
index 4526144..717366a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
@@ -1947,6 +1947,7 @@ public class SQLAnalyzer extends 
SQLParserBaseVisitor<Expr> {
     final int PARTITION_MASK = 00000020;
     final int SET_MASK = 00000002;
     final int PROPERTY_MASK = 00010000;
+    final int REPAIR_MASK = 00000003;
 
     int val = 00000000;
 
@@ -1978,6 +1979,9 @@ public class SQLAnalyzer extends 
SQLParserBaseVisitor<Expr> {
           case PROPERTY:
             val = val | PROPERTY_MASK;
             break;
+          case REPAIR:
+            val = val | REPAIR_MASK;
+            break;
           default:
             break;
         }
@@ -1989,6 +1993,8 @@ public class SQLAnalyzer extends 
SQLParserBaseVisitor<Expr> {
   private AlterTableOpType evaluateAlterTableOperationTye(final int value) {
 
     switch (value) {
+      case 19:
+        return AlterTableOpType.REPAIR_PARTITION;
       case 65:
         return AlterTableOpType.RENAME_TABLE;
       case 73:

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst 
b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
index ffc34d1..959ebcc 100644
--- a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
+++ b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
@@ -96,4 +96,21 @@ You can use ``ALTER TABLE ADD PARTITION`` to add partitions 
to a table. The loca
   ALTER TABLE table1 DROP PARTITION (col1 = '2015' , col2 = '01', col3 = '11' )
   ALTER TABLE table1 DROP PARTITION (col1 = 'TAJO' ) PURGE
 
-You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. 
This removes the data for a managed table and this doesn't remove the data for 
an external table. But if ``PURGE`` is specified for an external table, the 
partition data will be removed. The metadata is completely lost in all cases. 
An error is thrown if the partition for the table doesn't exists. You can use 
``IF EXISTS`` to skip the error.
+You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. 
This doesn't remove the data for a table. But if ``PURGE`` is specified, the 
partition data will be removed. The metadata is completely lost in all cases. 
An error is thrown if the partition for the table doesn't exist. You can use 
``IF EXISTS`` to skip the error.
+
+========================
+REPAIR PARTITION
+========================
+
+Tajo stores a list of partitions for each table in its catalog. If partitions 
are manually added to the distributed file system, the metastore is not aware 
of these partitions. Running the ``ALTER TABLE REPAIR PARTITION`` statement 
ensures that the tables are properly populated.
+
+*Synopsis*
+
+.. code-block:: sql
+
+  ALTER TABLE <table_name> REPAIR PARTITION
+
+.. note::
+
+  Even though an information of a partition is stored in the catalog, Tajo 
does not recover it when its partition directory doesn't exist in the file 
system.
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
index b5cd42b..5123fc4 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
@@ -199,7 +199,7 @@ public class PartitionedTableRewriter implements 
LogicalPlanRewriteRule {
    * @param partitionColumns The partition columns schema
    * @return The array of path filter, accpeting all partition paths.
    */
-  private static PathFilter [] buildAllAcceptingPathFilters(Schema 
partitionColumns) {
+  public static PathFilter [] buildAllAcceptingPathFilters(Schema 
partitionColumns) {
     Column target;
     PathFilter [] filters = new PathFilter[partitionColumns.size()];
     List<EvalNode> accumulatedFilters = Lists.newArrayList();
@@ -214,7 +214,7 @@ public class PartitionedTableRewriter implements 
LogicalPlanRewriteRule {
     return filters;
   }
 
-  private static Path [] toPathArray(FileStatus[] fileStatuses) {
+  public static Path [] toPathArray(FileStatus[] fileStatuses) {
     Path [] paths = new Path[fileStatuses.length];
     for (int j = 0; j < fileStatuses.length; j++) {
       paths[j] = fileStatuses[j].getPath();

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
index 608fa4c..c75c3fd 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -650,6 +650,9 @@ public class LogicalNodeDeserializer {
       alterTable.setPurge(alterPartition.getPurge());
       alterTable.setIfExists(alterPartition.getIfExists());
       break;
+    case REPAIR_PARTITION:
+      alterTable.setTableName(alterTableProto.getTableName());
+      break;
     default:
       throw new TajoRuntimeException(
           new NotImplementedException("Unknown SET type in ALTER TABLE: " + 
alterTableProto.getSetType().name()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
index a0f1fcc..3cf7d9e 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
@@ -633,6 +633,10 @@ public class LogicalNodeSerializer extends 
BasicLogicalPlanVisitor<LogicalNodeSe
       partitionBuilder.setPurge(node.isPurge());
       alterTableBuilder.setAlterPartition(partitionBuilder);
       break;
+    case REPAIR_PARTITION:
+      
alterTableBuilder.setSetType(PlanProto.AlterTableNode.Type.REPAIR_PARTITION);
+      alterTableBuilder.setTableName(node.getTableName());
+      break;
     default:
       throw new TajoRuntimeException(
           new NotImplementedException("Unknown SET type in ALTER TABLE: " + 
node.getAlterTableOpType().name()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto 
b/tajo-plan/src/main/proto/Plan.proto
index 8a8ecb1..fa1deeb 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -302,6 +302,7 @@ message AlterTableNode {
     SET_PROPERTY = 3;
     ADD_PARTITION = 4;
     DROP_PARTITION = 5;
+    REPAIR_PARTITION = 6;    
   }
 
   message RenameTable {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
----------------------------------------------------------------------
diff --git 
a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4 
b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
index 896f627..ee61320 100644
--- a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
+++ b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
@@ -290,6 +290,7 @@ RANK : R A N K;
 RECORD : R E C O R D;
 REGEXP : R E G E X P;
 RENAME : R E N A M E;
+REPAIR : R E P A I R;
 RESET : R E S E T;
 RLIKE : R L I K E;
 ROLLUP : R O L L U P;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
----------------------------------------------------------------------
diff --git 
a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4 
b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
index c125352..e2693ea 100644
--- a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
+++ b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
@@ -308,6 +308,7 @@ nonreserved_keywords
   | RECORD
   | REGEXP
   | RENAME
+  | REPAIR
   | RESET
   | RLIKE
   | ROLLUP
@@ -1624,6 +1625,7 @@ alter_table_statement
   | ALTER TABLE table_name ADD (if_not_exists)? PARTITION LEFT_PAREN 
partition_column_value_list RIGHT_PAREN (LOCATION 
path=Character_String_Literal)?
   | ALTER TABLE table_name DROP (if_exists)? PARTITION LEFT_PAREN 
partition_column_value_list RIGHT_PAREN (PURGE)?
   | ALTER TABLE table_name SET PROPERTY property_list
+  | ALTER TABLE table_name REPAIR PARTITION
   ;
 
 partition_column_value_list

Reply via email to