Repository: tajo Updated Branches: refs/heads/master 1a7c353c2 -> 4be674610
TAJO-1798: Dynamic partitioning occasionally fails. Closes #709 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4be67461 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4be67461 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4be67461 Branch: refs/heads/master Commit: 4be6746102d026dcf47b12a0548ddb15f33bde3e Parents: 1a7c353 Author: JaeHwa Jung <[email protected]> Authored: Wed Aug 26 18:02:20 2015 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Wed Aug 26 18:02:20 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/engine/query/TestTablePartitions.java | 46 ++++++++++++++-- .../java/org/apache/tajo/querymaster/Query.java | 56 +++++++++++--------- .../java/org/apache/tajo/querymaster/Stage.java | 13 +++-- .../apache/tajo/querymaster/TaskAttempt.java | 17 +++--- 5 files changed, 91 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/4be67461/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index c6a85ad..32e84b5 100644 --- a/CHANGES +++ b/CHANGES @@ -234,6 +234,8 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1798: Dynamic partitioning occasionally fails. (jaehwa) + TAJO-1799: Fix incorrect event handler when kill-query failed. (jinho) TAJO-1783: Query result is not returned by invalid output path. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/4be67461/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 6eb2841..952e26a 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -23,10 +23,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.DeflateCodec; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryTestCaseBase; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.*; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; @@ -1298,4 +1295,45 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(numRows, new Long(rowCount)); } } + + @Test + public final void testDuplicatedPartitions() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testDuplicatedPartitions"); + + try { + executeString("CREATE TABLE lineitem2 as select * from lineitem").close(); + + // Execute UNION ALL statement for creating multiple output files. + if (nodeType == NodeType.INSERT) { + executeString( + "create table " + tableName + " (col1 int4, col2 int4) partition by column(key text) ").close(); + + executeString( + "insert overwrite into " + tableName + + " select a.l_orderkey, a.l_partkey, a.l_returnflag from lineitem a union all" + + " select b.l_orderkey, b.l_partkey, b.l_returnflag from lineitem2 b" + ).close(); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(key text) as " + + " select a.l_orderkey, a.l_partkey, a.l_returnflag from lineitem a union all" + + " select b.l_orderkey, b.l_partkey, b.l_returnflag from lineitem2 b" + ).close(); + } + + // If duplicated partitions had been removed, partitions just will contain 'KEY=N' partition and 'KEY=R' + // partition. In previous Query and Stage, duplicated partitions were not deleted because they had been in List. + // If you want to verify duplicated partitions, you need to use List instead of Set with DerbyStore. + List<PartitionDescProto> partitions = catalog.getPartitions(DEFAULT_DATABASE_NAME, tableName); + assertEquals(2, partitions.size()); + + PartitionDescProto firstPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName, "key=N"); + assertNotNull(firstPartition); + PartitionDescProto secondPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName, "key=R"); + assertNotNull(secondPartition); + } finally { + executeString("DROP TABLE lineitem2 PURGE"); + executeString("DROP TABLE " + tableName + " PURGE"); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4be67461/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index b09d5fd..9560353 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -18,6 +18,7 @@ package org.apache.tajo.querymaster; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; @@ -333,14 +334,17 @@ public class Query implements EventHandler<QueryEvent> { } public List<PartitionDescProto> getPartitions() { - List<PartitionDescProto> partitions = new ArrayList<PartitionDescProto>(); + Set<PartitionDescProto> partitions = TUtil.newHashSet(); for(Stage eachStage : getStages()) { - if (!eachStage.getPartitions().isEmpty()) { - partitions.addAll(eachStage.getPartitions()); - } + partitions.addAll(eachStage.getPartitions()); } + return Lists.newArrayList(partitions); + } - return partitions; + public void clearPartitions() { + for(Stage eachStage : getStages()) { + eachStage.clearPartitions(); + } } public List<String> getDiagnostics() { @@ -505,30 +509,30 @@ public class Query implements EventHandler<QueryEvent> { QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); - TableDesc desc = query.getResultDesc(); - - // If there is partitions - List<PartitionDescProto> partitions = query.getPartitions(); - if (partitions!= null && !partitions.isEmpty()) { - - String databaseName, simpleTableName; - - if (CatalogUtil.isFQTableName(desc.getName())) { - String[] split = CatalogUtil.splitFQTableName(desc.getName()); - databaseName = split[0]; - simpleTableName = split[1]; + // Add dynamic partitions to catalog for partition table. + if (queryContext.hasOutputTableUri() && queryContext.hasPartition()) { + List<PartitionDescProto> partitions = query.getPartitions(); + if (partitions != null) { + String databaseName, simpleTableName; + + if (CatalogUtil.isFQTableName(tableDesc.getName())) { + String[] split = CatalogUtil.splitFQTableName(tableDesc.getName()); + databaseName = split[0]; + simpleTableName = split[1]; + } else { + databaseName = queryContext.getCurrentDatabase(); + simpleTableName = tableDesc.getName(); + } + + // Store partitions to CatalogStore using alter table statement. + catalog.addPartitions(databaseName, simpleTableName, partitions, true); + LOG.info("Added partitions to catalog (total=" + partitions.size() + ")"); } else { - databaseName = queryContext.getCurrentDatabase(); - simpleTableName = desc.getName(); + LOG.info("Can't find partitions for adding."); } - - // Store partitions to CatalogStore using alter table statement. - catalog.addPartitions(databaseName, simpleTableName, partitions, true); - } else { - LOG.info("Can't find partitions for adding."); + query.clearPartitions(); } - - } catch (Exception e) { + } catch (Throwable e) { query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); return QueryState.QUERY_ERROR; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4be67461/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index cf5cdbd..f6c9cdb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -487,9 +487,8 @@ public class Stage implements EventHandler<StageEvent> { return stageHistory; } - public List<PartitionDescProto> getPartitions() { - List<PartitionDescProto> partitions = TUtil.newList(); - + public Set<PartitionDescProto> getPartitions() { + Set<PartitionDescProto> partitions = TUtil.newHashSet(); for(Task eachTask : getTasks()) { if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty()) { partitions.addAll(eachTask.getLastAttempt().getPartitions()); @@ -499,6 +498,14 @@ public class Stage implements EventHandler<StageEvent> { return partitions; } + public void clearPartitions() { + for(Task eachTask : getTasks()) { + if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty()) { + eachTask.getLastAttempt().getPartitions().clear(); + } + } + } + /** * It finalizes this stage. It is only invoked when the stage is finalizing. */ http://git-wip-us.apache.org/repos/asf/tajo/blob/4be67461/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java index f5fcfa7..cda62a4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java @@ -38,9 +38,7 @@ import org.apache.tajo.querymaster.Task.IntermediateEntry; import org.apache.tajo.querymaster.Task.PullHost; import org.apache.tajo.util.TUtil; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; +import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -69,7 +67,7 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { private CatalogProtos.TableStatsProto inputStats; private CatalogProtos.TableStatsProto resultStats; - private List<PartitionDescProto> partitions; + private Set<PartitionDescProto> partitions; protected static final StateMachineFactory <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> @@ -194,8 +192,7 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { this.writeLock = readWriteLock.writeLock(); stateMachine = stateMachineFactory.make(this); - - this.partitions = TUtil.newList(); + this.partitions = TUtil.newHashSet(); } public TaskAttemptState getState() { @@ -258,12 +255,12 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { return new TableStats(resultStats); } - public List<PartitionDescProto> getPartitions() { + public Set<PartitionDescProto> getPartitions() { return partitions; } - public void setPartitions(List<PartitionDescProto> partitions) { - this.partitions = partitions; + public void addPartitions(List<PartitionDescProto> partitions) { + this.partitions.addAll(partitions); } private void fillTaskStatistics(TaskCompletionReport report) { @@ -407,7 +404,7 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { try { if (report.getPartitionsCount() > 0) { - taskAttempt.setPartitions(report.getPartitionsList()); + taskAttempt.addPartitions(report.getPartitionsList()); } taskAttempt.fillTaskStatistics(report);
