Repository: tajo Updated Branches: refs/heads/master 00ccb8ba9 -> d80c32b28
http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 4256582..9279f64 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -31,6 +31,8 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; @@ -59,6 +61,7 @@ import org.apache.tajo.storage.Tablespace; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.StageHistory; import org.apache.tajo.util.history.TaskHistory; import org.apache.tajo.worker.FetchImpl; @@ -483,6 +486,18 @@ public class Stage implements EventHandler<StageEvent> { return stageHistory; } + public List<PartitionDescProto> getPartitions() { + List<PartitionDescProto> partitions = TUtil.newList(); + + for(Task eachTask : getTasks()) { + if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty()) { + partitions.addAll(eachTask.getLastAttempt().getPartitions()); + } + } + + return partitions; + } + /** * It finalizes this stage. It is only invoked when the stage is finalizing. */ http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java index 6c48d3b..f5fcfa7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.state.*; import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.ResourceProtos.TaskCompletionReport; import org.apache.tajo.ResourceProtos.ShuffleFileOutput; @@ -35,6 +36,7 @@ import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptSched import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.querymaster.Task.IntermediateEntry; import org.apache.tajo.querymaster.Task.PullHost; +import org.apache.tajo.util.TUtil; import java.util.ArrayList; import java.util.EnumSet; @@ -67,6 +69,8 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { private CatalogProtos.TableStatsProto inputStats; private CatalogProtos.TableStatsProto resultStats; + private List<PartitionDescProto> partitions; + protected static final StateMachineFactory <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> stateMachineFactory = new StateMachineFactory @@ -190,6 +194,8 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { this.writeLock = readWriteLock.writeLock(); stateMachine = stateMachineFactory.make(this); + + this.partitions = TUtil.newList(); } public TaskAttemptState getState() { @@ -252,6 +258,14 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { return new TableStats(resultStats); } + public List<PartitionDescProto> getPartitions() { + return partitions; + } + + public void setPartitions(List<PartitionDescProto> partitions) { + this.partitions = partitions; + } + private void fillTaskStatistics(TaskCompletionReport report) { this.progress = 1.0f; @@ -392,6 +406,10 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { TaskCompletionReport report = ((TaskCompletionEvent)event).getReport(); try { + if (report.getPartitionsCount() > 0) { + taskAttempt.setPartitions(report.getPartitionsList()); + } + taskAttempt.fillTaskStatistics(report); taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED)); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 5d7a53a..5996118 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.enforce.Enforcer; @@ -85,6 +86,8 @@ public class TaskAttemptContext { private EvalContext evalContext = new EvalContext(); + private List<PartitionDescProto> partitions; + public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext, final TaskAttemptId taskId, final FragmentProto[] fragments, @@ -116,6 +119,8 @@ public class TaskAttemptContext { this.state = TaskAttemptState.TA_PENDING; this.partitionOutputVolume = Maps.newHashMap(); + + this.partitions = TUtil.newList(); } @VisibleForTesting @@ -414,4 +419,18 @@ public class TaskAttemptContext { public EvalContext getEvalContext() { return evalContext; } + + public List<PartitionDescProto> getPartitions() { + return partitions; + } + + public void setPartitions(List<PartitionDescProto> partitions) { + this.partitions = partitions; + } + + public void addPartition(PartitionDescProto partition) { + if (!partitions.contains(partition)) { + partitions.add(partition); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 92c682c..7b8d06f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -329,6 +329,10 @@ public class TaskImpl implements Task { builder.setResultStats(new TableStats().getProto()); } + if (!context.getPartitions().isEmpty()) { + builder.addAllPartitions(context.getPartitions()); + } + Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs(); if (it.hasNext()) { do { http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/proto/ResourceProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto index 97bf05e..e789b81 100644 --- a/tajo-core/src/main/proto/ResourceProtos.proto +++ b/tajo-core/src/main/proto/ResourceProtos.proto @@ -108,6 +108,7 @@ message TaskCompletionReport { optional TableStatsProto inputStats = 3; optional TableStatsProto resultStats = 4; repeated ShuffleFileOutput shuffleFileOutputs = 5; + repeated PartitionDescProto partitions = 6; } message TaskFatalErrorReport { http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java b/tajo-core/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java index 0e37b47..bb14aec 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java @@ -379,6 +379,7 @@ public class TestSQLAnalyzer { assertEquals("1", value1.getValue()); LiteralValue value2 = (LiteralValue)alterTable.getValues()[1]; assertEquals("2", value2.getValue()); + assertFalse(alterTable.isIfNotExists()); } @Test @@ -397,6 +398,7 @@ public class TestSQLAnalyzer { LiteralValue value2 = (LiteralValue)alterTable.getValues()[1]; assertEquals("2", value2.getValue()); assertEquals(alterTable.getLocation(), "hdfs://xxx.com/warehouse/table1/col1=1/col2=2"); + assertFalse(alterTable.isIfNotExists()); } @Test @@ -418,6 +420,7 @@ public class TestSQLAnalyzer { LiteralValue value3 = (LiteralValue)alterTable.getValues()[2]; assertEquals("11", value3.getValue()); assertEquals(alterTable.getLocation(), "hdfs://xxx.com/warehouse/table1/col1=2015/col2=01/col3=11"); + assertFalse(alterTable.isIfNotExists()); } @Test @@ -432,6 +435,22 @@ public class TestSQLAnalyzer { assertEquals("col1", alterTable.getColumns()[0].getName()); LiteralValue value1 = (LiteralValue)alterTable.getValues()[0]; assertEquals("TAJO", value1.getValue()); + assertFalse(alterTable.isIfNotExists()); + } + + @Test + public void testAlterTableAddPartition5() throws IOException { + String sql = FileUtil.readTextFileFromResource("queries/default/alter_table_add_partition_5.sql"); + Expr expr = parseQuery(sql); + assertEquals(OpType.AlterTable, expr.getType()); + AlterTable alterTable = (AlterTable)expr; + assertEquals(alterTable.getAlterTableOpType(), AlterTableOpType.ADD_PARTITION); + assertEquals(1, alterTable.getColumns().length); + assertEquals(1, alterTable.getValues().length); + assertEquals("col1", alterTable.getColumns()[0].getName()); + LiteralValue value1 = (LiteralValue)alterTable.getValues()[0]; + assertEquals("TAJO", value1.getValue()); + assertTrue(alterTable.isIfNotExists()); } @Test @@ -450,6 +469,7 @@ public class TestSQLAnalyzer { LiteralValue value2 = (LiteralValue)alterTable.getValues()[1]; assertEquals("2", value2.getValue()); assertFalse(alterTable.isPurge()); + assertFalse(alterTable.isIfExists()); } @Test @@ -471,6 +491,7 @@ public class TestSQLAnalyzer { LiteralValue value3 = (LiteralValue)alterTable.getValues()[2]; assertEquals("11", value3.getValue()); assertFalse(alterTable.isPurge()); + assertFalse(alterTable.isIfExists()); } @Test @@ -486,6 +507,23 @@ public class TestSQLAnalyzer { LiteralValue value1 = (LiteralValue)alterTable.getValues()[0]; assertEquals("TAJO", value1.getValue()); assertTrue(alterTable.isPurge()); + assertFalse(alterTable.isIfExists()); + } + + @Test + public void testAlterTableDropPartition4() throws IOException { + String sql = FileUtil.readTextFileFromResource("queries/default/alter_table_drop_partition_4.sql"); + Expr expr = parseQuery(sql); + assertEquals(OpType.AlterTable, expr.getType()); + AlterTable alterTable = (AlterTable)expr; + assertEquals(alterTable.getAlterTableOpType(), AlterTableOpType.DROP_PARTITION); + assertEquals(1, alterTable.getColumns().length); + assertEquals(1, alterTable.getValues().length); + assertEquals("col1", alterTable.getColumns()[0].getName()); + LiteralValue value1 = (LiteralValue)alterTable.getValues()[0]; + assertEquals("TAJO", value1.getValue()); + assertTrue(alterTable.isPurge()); + assertTrue(alterTable.isIfExists()); } @Test http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java index 8cdaf80..8339ea7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java @@ -31,7 +31,6 @@ import org.junit.experimental.categories.Category; import java.sql.ResultSet; import java.util.List; -import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.junit.Assert.*; @Category(IntegrationTest.class) @@ -87,6 +86,7 @@ public class TestAlterTable extends QueryTestCaseBase { assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(1).getSimpleName(), "col4"); executeDDL("alter_table_add_partition1.sql", null); + executeDDL("alter_table_add_partition2.sql", null); List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitions("TestAlterTable", "partitioned_table"); assertNotNull(partitions); @@ -104,6 +104,7 @@ public class TestAlterTable extends QueryTestCaseBase { assertTrue(partitionPath.toString().indexOf("col3=1/col4=2") > 0); executeDDL("alter_table_drop_partition1.sql", null); + executeDDL("alter_table_drop_partition2.sql", null); partitions = catalog.getPartitions("TestAlterTable", "partitioned_table"); assertNotNull(partitions); http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index d18db71..6eb2841 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -19,10 +19,7 @@ package org.apache.tajo.engine.query; import com.google.common.collect.Maps; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.DeflateCodec; @@ -34,12 +31,13 @@ import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.exception.ReturnStateUtil; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; @@ -123,6 +121,10 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType()); assertEquals(1, channel.getShuffleKeys().length); + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + tableDesc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @@ -169,6 +171,10 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType()); assertEquals(1, channel.getShuffleKeys().length); + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + tableDesc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @@ -195,6 +201,10 @@ public class TestTablePartitions extends QueryTestCaseBase { } res.close(); + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + tableDesc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -234,6 +244,10 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(1)); assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(2)); } + + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, + new String[]{"key"}, desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @@ -338,6 +352,9 @@ public class TestTablePartitions extends QueryTestCaseBase { assertResultSet(res, "case13.result"); res.close(); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @@ -411,6 +428,13 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3)); } + res = executeString("SELECT col1, col2, col3 FROM " + tableName); + String result = resultSetToString(res); + res.close(); + + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @@ -438,6 +462,10 @@ public class TestTablePartitions extends QueryTestCaseBase { res.close(); TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, + desc.getStats().getNumRows()); + Path path = new Path(desc.getUri()); FileSystem fs = FileSystem.get(conf); @@ -479,6 +507,11 @@ public class TestTablePartitions extends QueryTestCaseBase { res.close(); desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + + // TODO: When inserting into already exists partitioned table, table status need to change correctly. +// verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, +// desc.getStats().getNumRows()); + path = new Path(desc.getUri()); verifyDirectoriesForThreeColumns(fs, path, 2); @@ -632,6 +665,9 @@ public class TestTablePartitions extends QueryTestCaseBase { } } + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -688,6 +724,9 @@ public class TestTablePartitions extends QueryTestCaseBase { } } + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -783,6 +822,9 @@ public class TestTablePartitions extends QueryTestCaseBase { res.close(); assertEquals(3, i); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -851,6 +893,9 @@ public class TestTablePartitions extends QueryTestCaseBase { assertFalse(res.next()); res.close(); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -874,6 +919,10 @@ public class TestTablePartitions extends QueryTestCaseBase { assertResultSet(res, "case14.result"); res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -900,6 +949,10 @@ public class TestTablePartitions extends QueryTestCaseBase { assertResultSet(res, "case15.result"); res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } } @@ -982,6 +1035,10 @@ public class TestTablePartitions extends QueryTestCaseBase { assertResultSet(res); res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col2"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -1009,6 +1066,10 @@ public class TestTablePartitions extends QueryTestCaseBase { assertResultSet(res); res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col2"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -1085,6 +1146,10 @@ public class TestTablePartitions extends QueryTestCaseBase { } assertEquals(data.size(), numRows); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, "test_partition"); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, "test_partition", new String[]{"col1"}, + desc.getStats().getNumRows()); + } finally { testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname, TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.defaultVal); @@ -1178,4 +1243,59 @@ public class TestTablePartitions extends QueryTestCaseBase { } } + /** + * Verify added partitions to a table. This would check each partition's directory using record of table. + * + * + * @param databaseName + * @param tableName + * @param partitionColumns + * @param numRows + * @throws Exception + */ + private void verifyPartitionDirectoryFromCatalog(String databaseName, String tableName, + String[] partitionColumns, Long numRows) throws Exception { + int rowCount = 0; + + // Get all partition column values + StringBuilder query = new StringBuilder(); + query.append("SELECT"); + for (int i = 0; i < partitionColumns.length; i++) { + String partitionColumn = partitionColumns[i]; + if (i > 0) { + query.append(","); + } + query.append(" ").append(partitionColumn); + } + query.append(" FROM ").append(tableName); + ResultSet res = executeString(query.toString()); + + StringBuilder partitionName = new StringBuilder(); + PartitionDescProto partitionDescProto = null; + + // Check whether that partition's directory exist or doesn't exist. + while(res.next()) { + partitionName.delete(0, partitionName.length()); + + for (int i = 0; i < partitionColumns.length; i++) { + String partitionColumn = partitionColumns[i]; + if (i > 0) { + partitionName.append("/"); + } + partitionName.append(partitionColumn).append("=").append(res.getString(partitionColumn)); + } + partitionDescProto = catalog.getPartition(databaseName, tableName, partitionName.toString()); + assertNotNull(partitionDescProto); + assertTrue(partitionDescProto.getPath().indexOf(tableName + "/" + partitionName.toString()) > 0); + + rowCount++; + } + + res.close(); + + // Check row count. + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(numRows, new Long(rowCount)); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_add_partition2.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_add_partition2.sql b/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_add_partition2.sql new file mode 100644 index 0000000..315ac47 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_add_partition2.sql @@ -0,0 +1 @@ +ALTER TABLE partitioned_table ADD IF NOT EXISTS PARTITION (col3 = 1 , col4 = 2) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql b/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql new file mode 100644 index 0000000..0d4c932 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql @@ -0,0 +1 @@ +ALTER TABLE partitioned_table DROP IF EXISTS PARTITION (col3 = 1 , col4 = 2) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/resources/queries/default/alter_table_add_partition_5.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/default/alter_table_add_partition_5.sql b/tajo-core/src/test/resources/queries/default/alter_table_add_partition_5.sql new file mode 100644 index 0000000..127d999 --- /dev/null +++ b/tajo-core/src/test/resources/queries/default/alter_table_add_partition_5.sql @@ -0,0 +1 @@ +ALTER TABLE table1 ADD IF NOT EXISTS PARTITION (col1 = 'TAJO' ) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/resources/queries/default/alter_table_drop_partition_4.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/default/alter_table_drop_partition_4.sql b/tajo-core/src/test/resources/queries/default/alter_table_drop_partition_4.sql new file mode 100644 index 0000000..44c7977 --- /dev/null +++ b/tajo-core/src/test/resources/queries/default/alter_table_drop_partition_4.sql @@ -0,0 +1 @@ +ALTER TABLE table1 DROP IF EXISTS PARTITION (col1 = 'TAJO' ) PURGE \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result b/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result index d01038a..b09f134 100644 --- a/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result +++ b/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result @@ -1,5 +1,5 @@ OK -ERROR: "key2" column is not the partition key of "default.testaltertableaddpartition". +ERROR: 'key2' column is not the partition key OK OK ERROR: partition 'key=0.1' does not exist http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-dist/src/main/conf/catalog-site.xml.template ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/conf/catalog-site.xml.template b/tajo-dist/src/main/conf/catalog-site.xml.template index 365de6b..9db714e 100644 --- a/tajo-dist/src/main/conf/catalog-site.xml.template +++ b/tajo-dist/src/main/conf/catalog-site.xml.template @@ -49,7 +49,7 @@ </property> <property> <name>tajo.catalog.jdbc.uri</name> - <value>jdbc:mysql://<host name>:<mysql port>/<database name for tajo>?createDatabaseIfNotExist=true</value> + <value>jdbc:mysql://<host name>:<mysql port>/<database name for tajo>?rewriteBatchedStatements=true</value> </property> --> @@ -61,7 +61,7 @@ </property> <property> <name>tajo.catalog.jdbc.uri</name> - <value>jdbc:mariadb://<mariadb host name>:<mariadb port>/<database name for tajo>?createDatabaseIfNotExist=true</value> + <value>jdbc:mariadb://<mariadb host name>:<mariadb port>/<database name for tajo>?rewriteBatchedStatements=true</value> </property> --> http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-docs/src/main/sphinx/sql_language/alter_table.rst ---------------------------------------------------------------------- diff --git a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst index b9b55fd..ffc34d1 100644 --- a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst +++ b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst @@ -79,7 +79,7 @@ ADD PARTITION ALTER TABLE table1 ADD PARTITION (col1 = 1 , col2 = 2) ALTER TABLE table1 ADD PARTITION (col1 = 1 , col2 = 2) LOCATION 'hdfs://xxx.com/warehouse/table1/col1=1/col2=2' -You can use ``ALTER TABLE ADD PARTITION`` to add partitions to a table. The location must be a directory inside of which data files reside. If the location doesn't exist on the file system, Tajo will make the location by force. ``ADD PARTITION`` changes the table metadata, but does not load data. If the data does not exist in the partition's location, queries will not return any results. +You can use ``ALTER TABLE ADD PARTITION`` to add partitions to a table. The location must be a directory inside of which data files reside. If the location doesn't exist on the file system, Tajo will make the location by force. ``ADD PARTITION`` changes the table metadata, but does not load data. If the data does not exist in the partition's location, queries will not return any results. An error is thrown if the partition for the table already exists. You can use ``IF NOT EXISTS`` to skip the error. ======================== DROP PARTITION @@ -89,12 +89,11 @@ You can use ``ALTER TABLE ADD PARTITION`` to add partitions to a table. The loca .. code-block:: sql - ALTER TABLE <table_name> [IF NOT EXISTS] DROP PARTITION (<partition column> = <partition value>, ...) [PURGE] + ALTER TABLE <table_name> [IF EXISTS] DROP PARTITION (<partition column> = <partition value>, ...) [PURGE] For example: ALTER TABLE table1 DROP PARTITION (col1 = 1 , col2 = 2) ALTER TABLE table1 DROP PARTITION (col1 = '2015' , col2 = '01', col3 = '11' ) ALTER TABLE table1 DROP PARTITION (col1 = 'TAJO' ) PURGE -You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. This removes the data for a managed table - and this doesn't remove the data for an external table. But if ``PURGE`` is specified for an external table, the partition data will be removed. The metadata is completely lost in all cases. \ No newline at end of file +You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. This removes the data for a managed table and this doesn't remove the data for an external table. But if ``PURGE`` is specified for an external table, the partition data will be removed. The metadata is completely lost in all cases. An error is thrown if the partition for the table doesn't exists. You can use ``IF EXISTS`` to skip the error. http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index c9e101b..c0727bb 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -2149,6 +2149,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex } alterTableNode.setPurge(alterTable.isPurge()); + alterTableNode.setIfNotExists(alterTable.isIfNotExists()); + alterTableNode.setIfExists(alterTable.isIfExists()); alterTableNode.setAlterTableOpType(alterTable.getAlterTableOpType()); return alterTableNode; } http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java index ecb173a..4e25baa 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java @@ -51,6 +51,10 @@ public class AlterTableNode extends LogicalNode { private String location; @Expose private boolean isPurge; + @Expose + private boolean ifNotExists; + @Expose + private boolean ifExists; public AlterTableNode(int pid) { super(pid, NodeType.ALTER_TABLE); @@ -158,6 +162,22 @@ public class AlterTableNode extends LogicalNode { this.isPurge = isPurge; } + public boolean isIfNotExists() { + return ifNotExists; + } + + public void setIfNotExists(boolean ifNotExists) { + this.ifNotExists = ifNotExists; + } + + public boolean isIfExists() { + return ifExists; + } + + public void setIfExists(boolean ifExists) { + this.ifExists = ifExists; + } + @Override public PlanString getPlanString() { return new PlanString(this); @@ -166,7 +186,7 @@ public class AlterTableNode extends LogicalNode { @Override public int hashCode() { return Objects.hashCode(tableName, addNewColumn, alterTableOpType, columnName, newColumnName, newTableName, - tableName, properties, partitionColumns, partitionValues, location, isPurge); + tableName, properties, partitionColumns, partitionValues, location, isPurge, ifNotExists, ifExists); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java index 25897f2..d298cc8 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -633,6 +633,7 @@ public class LogicalNodeDeserializer { if (alterPartition.getLocation() != null) { alterTable.setLocation(alterPartition.getLocation()); } + alterTable.setIfNotExists(alterPartition.getIfNotExists()); break; case DROP_PARTITION: alterPartition = alterTableProto.getAlterPartition(); @@ -640,6 +641,8 @@ public class LogicalNodeDeserializer { .getColumnNamesCount()])); alterTable.setPartitionValues(alterPartition.getPartitionValuesList().toArray(new String[alterPartition .getPartitionValuesCount()])); + alterTable.setPurge(alterPartition.getPurge()); + alterTable.setIfExists(alterPartition.getIfExists()); break; default: throw new UnimplementedException("Unknown SET type in ALTER TABLE: " + alterTableProto.getSetType().name()); http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-plan/src/main/proto/Plan.proto ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index 4019322..5acbbee 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -320,6 +320,8 @@ message AlterTableNode { repeated string partitionValues = 21; optional string location = 3; optional bool purge = 4; + optional bool ifNotExists = 5; + optional bool ifExists = 6; } required string tableName = 1;
