[
https://issues.apache.org/jira/browse/TAJO-1673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14903006#comment-14903006
]
ASF GitHub Bot commented on TAJO-1673:
--------------------------------------
Github user jihoonson commented on a diff in the pull request:
https://github.com/apache/tajo/pull/626#discussion_r40113680
--- Diff:
tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java ---
@@ -529,24 +536,137 @@ public void alterTable(TajoMaster.MasterContext
context, final QueryContext quer
catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName,
alterTable.getPartitionColumns(),
alterTable.getPartitionValues(), alterTable.getLocation(),
AlterTableType.DROP_PARTITION));
- // When dropping partition on an managed table, the data will be
delete from file system.
- if (!desc.isExternal()) {
+ // When dropping partition on a table, the data in the table will
NOT be deleted from the file system.
+ // But if PURGE is specified, the partition data will be deleted.
+ if (alterTable.isPurge()) {
deletePartitionPath(partitionDescProto);
- } else {
- // When dropping partition on an external table, the data in the
table will NOT be deleted from the file
- // system. But if PURGE is specified, the partition data will be
deleted.
- if (alterTable.isPurge()) {
- deletePartitionPath(partitionDescProto);
- }
}
}
-
+ break;
+ case REPAIR_PARTITION:
+ repairPartition(context, queryContext, alterTable);
break;
default:
throw new InternalError("alterTable cannot handle such query: \n" +
alterTable.toJson());
}
}
+ /**
+ * Run ALTER TABLE table_name REPAIR TABLE statement.
+ * This will recovery all partitions which exists on table directory.
+ *
+ *
+ * @param context
+ * @param queryContext
+ * @param alterTable
+ * @throws IOException
+ */
+ public void repairPartition(TajoMaster.MasterContext context, final
QueryContext queryContext,
+ final AlterTableNode alterTable) throws
IOException, TajoException {
+ final CatalogService catalog = context.getCatalog();
+ final String tableName = alterTable.getTableName();
+
+ String databaseName;
+ String simpleTableName;
+ if (CatalogUtil.isFQTableName(tableName)) {
+ String[] split = CatalogUtil.splitFQTableName(tableName);
+ databaseName = split[0];
+ simpleTableName = split[1];
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleTableName = tableName;
+ }
+
+ if (!catalog.existsTable(databaseName, simpleTableName)) {
+ throw new UndefinedTableException(alterTable.getTableName());
+ }
+
+ TableDesc tableDesc = catalog.getTableDesc(databaseName,
simpleTableName);
+
+ if(tableDesc.getPartitionMethod() == null) {
+ throw new UndefinedPartitionMethodException(simpleTableName);
+ }
+
+ Path tablePath = new Path(tableDesc.getUri());
+ FileSystem fs = tablePath.getFileSystem(context.getConf());
+
+ PartitionMethodDesc partitionDesc = tableDesc.getPartitionMethod();
+ Schema partitionColumns = new Schema();
+ for (Column column :
partitionDesc.getExpressionSchema().getRootColumns()) {
+ partitionColumns.addColumn(column);
+ }
+
+ // Get the array of path filter, accepting all partition paths.
+ PathFilter[] filters =
PartitionedTableRewriter.buildAllAcceptingPathFilters(partitionColumns);
+
+ // loop from one to the number of partition columns
+ Path [] filteredPaths =
PartitionedTableRewriter.toPathArray(fs.listStatus(tablePath, filters[0]));
+
+ // Get all file status matched to a ith level path filter.
+ for (int i = 1; i < partitionColumns.size(); i++) {
+ filteredPaths =
PartitionedTableRewriter.toPathArray(fs.listStatus(filteredPaths, filters[i]));
+ }
+
+ // Find missing partitions from filesystem
+ List<PartitionDescProto> existingPartitions =
catalog.getPartitions(databaseName, simpleTableName);
+ List<String> existingPartitionNames = TUtil.newList();
+ Path existingPartitionPath = null;
+
+ for(PartitionDescProto existingPartition : existingPartitions) {
+ existingPartitionPath = new Path(existingPartition.getPath());
+ existingPartitionNames.add(existingPartition.getPartitionName());
+ if (!fs.exists(existingPartitionPath)) {
+ LOG.info("Partitions missing from Filesystem:" +
existingPartition.getPartitionName());
+ }
+ }
+
+ // Find missing partitions from CatalogStore
+ List<PartitionDescProto> targetPartitions = TUtil.newList();
+ for(Path filteredPath : filteredPaths) {
+ PartitionDescProto targetPartition =
getPartitionDesc(simpleTableName, filteredPath);
+ if
(!existingPartitionNames.contains(targetPartition.getPartitionName())) {
+ LOG.info("Partitions not in CatalogStore:" +
targetPartition.getPartitionName());
+ targetPartitions.add(targetPartition);
+ }
+ }
+
+ catalog.addPartitions(databaseName, simpleTableName, targetPartitions,
true);
+
+ for(PartitionDescProto targetPartition: targetPartitions) {
+ LOG.info("Repair: Added partition to CatalogStore " + tableName +
":" + targetPartition.getPartitionName());
--- End diff --
This will emit too many logs.
> Implement recover partitions
> ----------------------------
>
> Key: TAJO-1673
> URL: https://issues.apache.org/jira/browse/TAJO-1673
> Project: Tajo
> Issue Type: Sub-task
> Components: Catalog
> Reporter: Jaehwa Jung
> Assignee: Jaehwa Jung
> Fix For: 0.11.0, 0.12.0
>
> Attachments: TAJO-1673.patch, TAJO-1673_2.patch
>
>
> Tajo stores a list of partitions for each table in its CatalogStore. If,
> however, new partitions are directly added to HDFS, the CatalogStore will not
> be aware of these partitions unless the user runs ALTER TABLE table_name ADD
> PARTITION commands on each of the newly added partitions.
> However, users can run a CatalogStore check command with the repair table
> option:
> {code:xml}
> MSCK REPAIR TABLE <table_name>
> {code}
> For the reference, I've referenced hive msck command.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)