Repository: tajo Updated Branches: refs/heads/branch-0.11.0 b8e947313 -> 0476ab844
TAJO-1901: Repair partition throws ArrayIndexOutOfBoundsException occasionally. Signed-off-by: Hyunsik Choi <[email protected]> Closes #796 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0476ab84 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0476ab84 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0476ab84 Branch: refs/heads/branch-0.11.0 Commit: 0476ab844b71a05e294c21d58841b6c88ad38d72 Parents: b8e9473 Author: JaeHwa Jung <[email protected]> Authored: Sun Oct 4 15:31:38 2015 -0700 Committer: Hyunsik Choi <[email protected]> Committed: Sun Oct 4 15:35:50 2015 -0700 ---------------------------------------------------------------------- CHANGES | 5 +- .../tajo/engine/query/TestAlterTable.java | 343 ++++++++++++++++++- .../apache/tajo/master/exec/DDLExecutor.java | 32 +- .../rewrite/rules/PartitionedTableRewriter.java | 2 +- 4 files changed, 364 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0476ab84/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index f37969b..05e98c6 100644 --- a/CHANGES +++ b/CHANGES @@ -1,7 +1,5 @@ Tajo Change Log -Release 0.12.0 - unreleased - Release 0.11.0 - unreleased NEW FEATURES @@ -287,6 +285,9 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1901: Repair partition throws ArrayIndexOutOfBoundsException + occasionally. (Contributed by jaehwa, committed by hyunsik) + TAJO-1902: Add line delimiter for repair partition in TajoDump. (jaehwa) TAJO-1889: UndefinedColumnException when a query with table subquery is http://git-wip-us.apache.org/repos/asf/tajo/blob/0476ab84/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java index 9a30012..58ceb74 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java @@ -23,22 +23,35 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.IntegrationTest; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.exception.UndefinedDatabaseException; -import org.apache.tajo.exception.UndefinedPartitionException; -import org.apache.tajo.exception.UndefinedPartitionMethodException; -import org.apache.tajo.exception.UndefinedTableException; +import org.apache.tajo.exception.*; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.File; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.List; import static org.junit.Assert.*; @Category(IntegrationTest.class) public class TestAlterTable extends QueryTestCaseBase { + + @Before + public void setUp() throws Exception { + executeString("create database " + getCurrentDatabase()).close(); + } + + @After + public void tearDown() throws Exception { + executeString("drop database " + getCurrentDatabase()).close(); + } + @Test public final void testAlterTableName() throws Exception { List<String> createdNames = executeDDL("table1_ddl.sql", "table1.tbl", "ABC"); @@ -58,7 +71,7 @@ public class TestAlterTable extends QueryTestCaseBase { public final void testAlterTableAddNewColumn() throws Exception { List<String> createdNames = executeDDL("table1_ddl.sql", "table1.tbl", "EFG"); executeDDL("alter_table_add_new_column_ddl.sql", null); - assertColumnExists(createdNames.get(0),"cool"); + assertColumnExists(createdNames.get(0), "cool"); } @Test @@ -199,6 +212,290 @@ public class TestAlterTable extends QueryTestCaseBase { catalog.dropTable(tableName); } + @Test + public final void testRepairPartitionWithDatabaseNameIncludeTableName() throws Exception { + String databaseName = "test_repair_partition"; + String tableName = "part"; + String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName); + + executeString("create database " + databaseName).close(); + executeString("create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key float8) " + + " as select l_orderkey, l_partkey, l_quantity from default.lineitem").close(); + + TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName); + assertNotNull(tableDesc); + + verifyPartitionCount(databaseName, tableName, 5); + + ResultSet res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc, key desc;"); + String result = resultSetToString(res); + String expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,36.0\n" + + "1,1,17.0\n" + + "2,2,38.0\n" + + "3,3,49.0\n" + + "3,2,45.0\n"; + res.close(); + assertEquals(expectedResult, result); + + // Remove all partitions + dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns()); + + verifyPartitionCount(databaseName, tableName, 0); + + executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close(); + + verifyPartitionCount(databaseName, tableName, 5); + + res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc, key desc;"); + result = resultSetToString(res); + res.close(); + assertEquals(expectedResult, result); + + executeString("DROP TABLE " + canonicalTableName + " PURGE").close(); + executeString("DROP database " + databaseName).close(); + } + + @Test + public void testRepairPartitionWithAbnormalDirectories() throws Exception { + String databaseName = getCurrentDatabase().toLowerCase(); + String tableName = "testRepairPartitionWithAbnormalDirectories".toLowerCase(); + String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName); + + executeString("create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key float8) " + + " as select l_orderkey, l_partkey, l_quantity from default.lineitem").close(); + + TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName); + assertNotNull(tableDesc); + + verifyPartitionCount(databaseName, tableName, 5); + + ResultSet res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc, key desc;"); + String result = resultSetToString(res); + String expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,36.0\n" + + "1,1,17.0\n" + + "2,2,38.0\n" + + "3,3,49.0\n" + + "3,2,45.0\n"; + res.close(); + assertEquals(expectedResult, result); + + // Remove all partitions + dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns()); + + verifyPartitionCount(databaseName, tableName, 0); + + // Make abnormal directories + FileSystem fs = FileSystem.get(conf); + Path path = new Path(tableDesc.getUri().getPath(), "key=100.0"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().getPath(), "key=110.0"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().getPath(), "key="); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().getPath(), "col1=a"); + fs.mkdirs(path); + assertEquals(9, fs.listStatus(path.getParent()).length); + + executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close(); + + verifyPartitionCount(databaseName, tableName, 7); + + res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc, key desc;"); + result = resultSetToString(res); + res.close(); + assertEquals(expectedResult, result); + + executeString("DROP TABLE " + canonicalTableName + " PURGE").close(); + } + + @Test + public void testRepairPartitionWithDatePartitionColumn() throws Exception { + String databaseName = getCurrentDatabase().toLowerCase(); + String tableName = "testRepairPartitionWithDatePartitionColumn".toLowerCase(); + String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName); + + executeString( + "create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key date) " + + " as select l_orderkey, l_partkey, l_shipdate::date from default.lineitem").close(); + + TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName); + assertNotNull(tableDesc); + + verifyPartitionCount(databaseName, tableName, 5); + + ResultSet res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc, key desc;"); + String result = resultSetToString(res); + String expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,1996-04-12\n" + + "1,1,1996-03-13\n" + + "2,2,1997-01-28\n" + + "3,3,1993-11-09\n" + + "3,2,1994-02-02\n"; + res.close(); + assertEquals(expectedResult, result); + + // Remove all partitions + dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns()); + + verifyPartitionCount(databaseName, tableName, 0); + + executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close(); + + verifyPartitionCount(databaseName, tableName, 5); + + res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc, key desc;"); + result = resultSetToString(res); + res.close(); + assertEquals(expectedResult, result); + + executeString("DROP TABLE " + canonicalTableName + " PURGE").close(); + } + + @Test + public void testRepairPartitionWithTimestampPartitionColumn() throws Exception { + String databaseName = getCurrentDatabase().toLowerCase(); + String tableName = "testRepairPartitionWithTimestampPartitionColumn".toLowerCase(); + String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName); + + executeString( + "create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key timestamp) " + + " as select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from default.lineitem"); + + TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName); + assertNotNull(tableDesc); + + verifyPartitionCount(databaseName, tableName, 5); + + ResultSet res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc, key desc;"); + String result = resultSetToString(res); + String expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,1996-04-12 00:00:00\n" + + "1,1,1996-03-13 00:00:00\n" + + "2,2,1997-01-28 00:00:00\n" + + "3,3,1993-11-09 00:00:00\n" + + "3,2,1994-02-02 00:00:00\n"; + res.close(); + assertEquals(expectedResult, result); + + // Remove all partitions + dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns()); + + verifyPartitionCount(databaseName, tableName, 0); + + executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close(); + + verifyPartitionCount(databaseName, tableName, 5); + + res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc, key desc;"); + result = resultSetToString(res); + res.close(); + assertEquals(expectedResult, result); + + executeString("DROP TABLE " + canonicalTableName + " PURGE").close(); + } + + @Test + public void testRepairPartitionWithTimesPartitionColumn() throws Exception { + String databaseName = getCurrentDatabase().toLowerCase(); + String tableName = "testRepairPartitionWithTimesPartitionColumn".toLowerCase(); + String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName); + + executeString( + "create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key time) " + + " as select l_orderkey, l_partkey " + + " , CASE l_shipdate WHEN '1996-03-13' THEN cast ('11:20:40' as time) " + + " WHEN '1997-01-28' THEN cast ('12:10:20' as time) " + + " WHEN '1994-02-02' THEN cast ('12:10:30' as time) " + + " ELSE cast ('00:00:00' as time) END " + + " from default.lineitem"); + + TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName); + assertNotNull(tableDesc); + + ResultSet res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc, key desc;"); + String result = resultSetToString(res); + String expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,11:20:40\n" + + "1,1,00:00:00\n" + + "2,2,12:10:20\n" + + "3,3,00:00:00\n" + + "3,2,12:10:30\n"; + res.close(); + assertEquals(expectedResult, result); + + verifyPartitionCount(databaseName, tableName, 4); + + // Remove all partitions + dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns()); + + verifyPartitionCount(databaseName, tableName, 0); + + executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close(); + + verifyPartitionCount(databaseName, tableName, 4); + + res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc, key desc;"); + result = resultSetToString(res); + res.close(); + assertEquals(expectedResult, result); + + executeString("DROP TABLE " + canonicalTableName + " PURGE").close(); + } + + + @Test + public void testRepairPartitionWithMutiplePartitionColumn() throws Exception { + String databaseName = getCurrentDatabase().toLowerCase(); + String tableName = "testRepairPartitionWithMutiplePartitionColumn".toLowerCase(); + String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName); + + executeString("create table " + canonicalTableName + " (col4 text) " + + " partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag, l_orderkey, l_partkey, " + + "l_quantity from default.lineitem"); + + TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName); + assertNotNull(tableDesc); + + ResultSet res = executeString("SELECT * FROM " + canonicalTableName + + " ORDER BY col1, col2 desc, col3 desc, col4;"); + String result = resultSetToString(res); + String expectedResult = "col4,col1,col2,col3\n" + + "-------------------------------\n" + + "N,1,1,36.0\n" + + "N,1,1,17.0\n" + + "N,2,2,38.0\n" + + "R,3,3,49.0\n" + + "R,3,2,45.0\n"; + res.close(); + assertEquals(expectedResult, result); + + verifyPartitionCount(databaseName, tableName, 5); + + // Remove all partitions + dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns()); + + verifyPartitionCount(databaseName, tableName, 0); + + executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close(); + + verifyPartitionCount(databaseName, tableName, 5); + + res = executeString("SELECT * FROM " + canonicalTableName + + " ORDER BY col1, col2 desc, col3 desc, col4;"); result = resultSetToString(res); + res.close(); + assertEquals(expectedResult, result); + + executeString("DROP TABLE " + canonicalTableName + " PURGE").close(); + } + + private void verifyPartitionCount(String databaseName, String tableName, int expectedCount) throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedPartitionException { @@ -206,4 +503,40 @@ public class TestAlterTable extends QueryTestCaseBase { assertNotNull(partitions); assertEquals(partitions.size(), expectedCount); } + + private void dropPartitions(String databaseName, String tableName, List<Column> colums) + throws Exception { + String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName); + List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitionsOfTable(databaseName, tableName); + + StringBuilder sb = new StringBuilder(); + for (CatalogProtos.PartitionDescProto partition : partitions) { + + sb.delete(0, sb.length()); + sb.append("ALTER TABLE ").append(canonicalTableName).append(" DROP PARTITION ("); + + String[] splitPartitionName = partition.getPartitionName().split(File.separator); + for(int i = 0; i < splitPartitionName.length; i++) { + String[] partitionColumnValue = splitPartitionName[i].split("="); + if (i > 0) { + sb.append(","); + } + + switch (colums.get(i).getDataType().getType()) { + case TEXT: + case TIME: + case TIMESTAMP: + case DATE: + sb.append(partitionColumnValue[0]).append("='").append(partitionColumnValue[1]).append("'"); + break; + default: + sb.append(partitionColumnValue[0]).append("=").append(partitionColumnValue[1]); + break; + } + } + sb.append(")"); + executeString(sb.toString()).close(); + } + } + } http://git-wip-us.apache.org/repos/asf/tajo/blob/0476ab84/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java index 013570e..588ea1f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java @@ -50,6 +50,7 @@ import org.apache.tajo.util.Pair; import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.TUtil; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -625,13 +626,25 @@ public class DDLExecutor { // Find missing partitions from CatalogStore List<PartitionDescProto> targetPartitions = TUtil.newList(); for(Path filteredPath : filteredPaths) { - PartitionDescProto targetPartition = getPartitionDesc(simpleTableName, filteredPath); - if (!existingPartitionNames.contains(targetPartition.getPartitionName())) { + + int startIdx = filteredPath.toString().indexOf(PartitionedTableRewriter.getColumnPartitionPathPrefix + (partitionColumns)); + + // if there is partition column in the path + if (startIdx > -1) { + PartitionDescProto targetPartition = getPartitionDesc(tablePath, filteredPath); + if (!existingPartitionNames.contains(targetPartition.getPartitionName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Partitions not in CatalogStore:" + targetPartition.getPartitionName()); + } + targetPartitions.add(targetPartition); + } + } else { if (LOG.isDebugEnabled()) { - LOG.debug("Partitions not in CatalogStore:" + targetPartition.getPartitionName()); + LOG.debug("Invalid partition path:" + filteredPath.toString()); } - targetPartitions.add(targetPartition); } + } catalog.addPartitions(databaseName, simpleTableName, targetPartitions, true); @@ -645,12 +658,11 @@ public class DDLExecutor { LOG.info("Total added partitions to CatalogStore: " + targetPartitions.size()); } - private PartitionDescProto getPartitionDesc(String tableName, Path path) throws IOException { - String partitionPath = path.toString(); + private PartitionDescProto getPartitionDesc(Path tablePath, Path partitionPath) throws IOException { + String partitionName = StringUtils.unescapePathName(partitionPath.toString()); - String partitionName = StringUtils.unescapePathName(partitionPath); - int startIndex = partitionPath.indexOf(tableName); - partitionName = partitionName.substring(startIndex + tableName.length() + 1, partitionPath.length()); + int startIndex = partitionName.indexOf(tablePath.toString()) + tablePath.toString().length(); + partitionName = partitionName.substring(startIndex + File.separator.length()); CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); builder.setPartitionName(partitionName); @@ -668,7 +680,7 @@ public class DDLExecutor { builder.addPartitionKeys(keyBuilder.build()); } - builder.setPath(partitionPath); + builder.setPath(partitionPath.toString()); return builder.build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/0476ab84/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 5b1a1f1..fc0b1bb 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -479,7 +479,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { * @param partitionColumn the schema of column partition * @return The first part string of column partition path. */ - private static String getColumnPartitionPathPrefix(Schema partitionColumn) { + public static String getColumnPartitionPathPrefix(Schema partitionColumn) { StringBuilder sb = new StringBuilder(); sb.append(partitionColumn.getColumn(0).getSimpleName()).append("="); return sb.toString();
