Repository: tajo Updated Branches: refs/heads/master ff57c77ea -> 5c852b798
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java index 2b3887f..25a2fbc 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java @@ -22,7 +22,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.QueryUnitId; +import org.apache.tajo.TaskId; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.worker.InterDataRetriever; @@ -80,8 +80,8 @@ public class TestHttpDataServer { public final void testInterDataRetriver() throws Exception { MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); ExecutionBlockId schid = plan.newExecutionBlockId(); - QueryUnitId qid1 = QueryIdFactory.newQueryUnitId(schid); - QueryUnitId qid2 = QueryIdFactory.newQueryUnitId(schid); + TaskId qid1 = QueryIdFactory.newTaskId(schid); + TaskId qid2 = QueryIdFactory.newTaskId(schid); File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out"); qid1Dir.mkdirs(); @@ -121,8 +121,8 @@ public class TestHttpDataServer { public final void testNoSuchFile() throws Exception { MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); ExecutionBlockId schid = plan.newExecutionBlockId(); - QueryUnitId qid1 = QueryIdFactory.newQueryUnitId(schid); - QueryUnitId qid2 = QueryIdFactory.newQueryUnitId(schid); + TaskId qid1 = QueryIdFactory.newTaskId(schid); + TaskId qid2 = QueryIdFactory.newTaskId(schid); File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out"); qid1Dir.mkdirs(); @@ -154,7 +154,7 @@ public class TestHttpDataServer { assertDataRetrival(qid1, addr.getPort(), watermark1); } - private static void assertDataRetrival(QueryUnitId id, int port, + private static void assertDataRetrival(TaskId id, int port, String watermark) throws IOException { URL url = new URL("http://127.0.0.1:"+port + "/?qid=" + id.toString() + "&fn=testHttp"); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql deleted file mode 100644 index f6d0eb3..0000000 --- a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case1.sql +++ /dev/null @@ -1 +0,0 @@ -select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql deleted file mode 100644 index c3e09f1..0000000 --- a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case2.sql +++ /dev/null @@ -1,5 +0,0 @@ -select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum -(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as - count_order -from lineitem -group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql b/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql deleted file mode 100644 index a0f9c78..0000000 --- a/tajo-core/src/test/resources/queries/TestQueryUnitStatusUpdate/case3.sql +++ /dev/null @@ -1,10 +0,0 @@ -select * -from ( - select a.col1, a.col2, a.key - from ColumnPartitionedTable a - join ColumnPartitionedTable b on a.key = b.key - where - (a.key = 45.0 or a.key = 38.0) -) test -order by - col1, col2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql new file mode 100644 index 0000000..f6d0eb3 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case1.sql @@ -0,0 +1 @@ +select l_linenumber, count(1) as unique_key from lineitem group by l_linenumber; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql new file mode 100644 index 0000000..c3e09f1 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case2.sql @@ -0,0 +1,5 @@ +select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum +(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as + count_order +from lineitem +group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql new file mode 100644 index 0000000..a0f9c78 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestTaskStatusUpdate/case3.sql @@ -0,0 +1,10 @@ +select * +from ( + select a.col1, a.col2, a.key + from ColumnPartitionedTable a + join ColumnPartitionedTable b on a.key = b.key + where + (a.key = 45.0 or a.key = 38.0) +) test +order by + col1, col2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-dist/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index c69c7d9..c408b16 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -225,8 +225,8 @@ <argument>Tajo</argument> <argument>org.apache.tajo.master.querymaster.Query, org.apache.tajo.master.querymaster.SubQuery, - org.apache.tajo.master.querymaster.QueryUnit, - org.apache.tajo.master.querymaster.QueryUnitAttempt + org.apache.tajo.master.querymaster.Task, + org.apache.tajo.master.querymaster.TaskAttempt </argument> <argument>Tajo.gv</argument> </arguments> http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java index 31db15c..bfb70b4 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java @@ -232,7 +232,7 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler { private List<String> splitMaps(List<String> qids) { if (null == qids) { - LOG.error("QueryUnitId is EMPTY"); + LOG.error("QueryId is EMPTY"); return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java index 67e7423..5591bba 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java @@ -23,8 +23,8 @@ import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.QueryUnitId; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; import org.apache.tajo.pullserver.FileAccessForbiddenException; import org.apache.tajo.util.TajoIdUtils; import org.jboss.netty.channel.ChannelHandlerContext; @@ -75,13 +75,13 @@ public class AdvancedDataRetriever implements DataRetriever { if (params.containsKey("sid")) { List<FileChunk> chunks = Lists.newArrayList(); - List<String> queryUnidIds = splitMaps(params.get("qid")); - for (String eachQueryUnitId : queryUnidIds) { - String[] queryUnitIdSeqTokens = eachQueryUnitId.split("_"); + List<String> taskIds = splitMaps(params.get("qid")); + for (String eachTaskId : taskIds) { + String[] taskIdSeqTokens = eachTaskId.split("_"); ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0)); - QueryUnitId quid = new QueryUnitId(ebId, Integer.parseInt(queryUnitIdSeqTokens[0])); + TaskId quid = new TaskId(ebId, Integer.parseInt(taskIdSeqTokens[0])); - QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid, Integer.parseInt(queryUnitIdSeqTokens[1])); + TaskAttemptId attemptId = new TaskAttemptId(quid, Integer.parseInt(taskIdSeqTokens[1])); RetrieverHandler handler = handlerMap.get(attemptId.toString()); FileChunk chunk = handler.get(params); @@ -113,7 +113,7 @@ public class AdvancedDataRetriever implements DataRetriever { private List<String> splitMaps(List<String> qids) { if (null == qids) { - LOG.error("QueryUnitId is EMPTY"); + LOG.error("QueryId is EMPTY"); return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java index e2d89d6..07a51ba 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java @@ -68,7 +68,7 @@ public abstract class StorageManager { private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { Configuration.class, - QueryUnitAttemptId.class, + TaskAttemptId.class, Schema.class, TableMeta.class, Path.class @@ -446,7 +446,7 @@ public abstract class StorageManager { * @throws java.io.IOException */ public Appender getAppender(OverridableConf queryContext, - QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) + TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) throws IOException { Appender appender; @@ -511,7 +511,7 @@ public abstract class StorageManager { * @param <T> * @return The scanner instance */ - public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, QueryUnitAttemptId taskAttemptId, + public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) { T result; try { http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java index 2b196c9..af3d623 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java @@ -158,8 +158,8 @@ public class StorageUtil extends StorageConstants { Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1); String pathName = lastFile.getName(); - // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq> - // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence> + // 0.8: pathName = part-<ExecutionBlockId.seq>-<TaskId.seq> + // 0.9: pathName = part-<ExecutionBlockId.seq>-<TaskId.seq>-<Sequence> String[] pathTokens = pathName.split("-"); if (pathTokens.length == 3) { return -1; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java index 8615235..4836352 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java @@ -21,7 +21,7 @@ package org.apache.tajo.storage.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; @@ -45,7 +45,7 @@ public abstract class AbstractHBaseAppender implements Appender { protected Configuration conf; protected Schema schema; protected TableMeta meta; - protected QueryUnitAttemptId taskAttemptId; + protected TaskAttemptId taskAttemptId; protected Path stagingDir; protected boolean inited = false; @@ -72,7 +72,7 @@ public abstract class AbstractHBaseAppender implements Appender { protected KeyValue[] keyValues; - public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + public AbstractHBaseAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema, TableMeta meta, Path stagingDir) { this.conf = conf; this.schema = schema; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java index 50f61a8..45a1bff 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; @@ -38,7 +38,7 @@ public class HBasePutAppender extends AbstractHBaseAppender { private HTableInterface htable; private long totalNumBytes; - public HBasePutAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + public HBasePutAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema, TableMeta meta, Path stagingDir) { super(conf, taskAttemptId, schema, meta, stagingDir); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java index a6e7a81..de4b4cb 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java @@ -539,7 +539,7 @@ public class HBaseStorageManager extends StorageManager { @Override public Appender getAppender(OverridableConf queryContext, - QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) + TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) throws IOException { if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java index 07f7988..36678e4 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; @@ -53,7 +53,7 @@ public class HFileAppender extends AbstractHBaseAppender { private Path workingFilePath; private FileOutputCommitter committer; - public HFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + public HFileAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema, TableMeta meta, Path stagingDir) { super(conf, taskAttemptId, schema, meta, stagingDir); } @@ -66,10 +66,10 @@ public class HFileAppender extends AbstractHBaseAppender { Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString()); - ExecutionBlockId ebId = taskAttemptId.getQueryUnitId().getExecutionBlockId(); + ExecutionBlockId ebId = taskAttemptId.getTaskId().getExecutionBlockId(); writerContext = new TaskAttemptContextImpl(taskConf, new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP, - taskAttemptId.getQueryUnitId().getId(), taskAttemptId.getId())); + taskAttemptId.getTaskId().getId(), taskAttemptId.getId())); HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2(); try { http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java index 4bf4c99..28c263c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.*; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -76,7 +76,7 @@ public class CSVFile { private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); private SerializerDeserializer serde; - public CSVAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId, + public CSVAppender(Configuration conf, final TaskAttemptId taskAttemptId, final Schema schema, final TableMeta meta, final Path workDir) throws IOException { super(conf, taskAttemptId, schema, meta, workDir); this.fs = workDir.getFileSystem(conf); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java index 47f67c6..b208a71 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -22,7 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; @@ -38,12 +38,12 @@ public abstract class FileAppender implements Appender { protected final TableMeta meta; protected final Schema schema; protected final Path workDir; - protected final QueryUnitAttemptId taskAttemptId; + protected final TaskAttemptId taskAttemptId; protected boolean enabledStats; protected Path path; - public FileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, + public FileAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema, TableMeta meta, Path workDir) { this.conf = conf; this.meta = meta; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java index 060bf16..3d4f7d5 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.tajo.OverridableConf; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; @@ -268,7 +268,7 @@ public class FileStorageManager extends StorageManager { ///////////////////////////////////////////////////////////////////////////// // FileInputFormat Area ///////////////////////////////////////////////////////////////////////////// - public Path getAppenderFilePath(QueryUnitAttemptId taskAttemptId, Path workDir) { + public Path getAppenderFilePath(TaskAttemptId taskAttemptId, Path workDir) { if (taskAttemptId == null) { // For testcase return workDir; @@ -277,8 +277,8 @@ public class FileStorageManager extends StorageManager { // where ss is the subquery id associated with this task, and nnnnnn is the task id. Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME, OUTPUT_FILE_PREFIX + - OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getQueryUnitId().getExecutionBlockId().getId()) + "-" + - OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getQueryUnitId().getId()) + "-" + + OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId()) + "-" + + OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" + OUTPUT_FILE_FORMAT_SEQ.get().format(0)); LOG.info("Output File Path: " + outFilePath); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java index 33b2750..4c772c9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java @@ -21,7 +21,7 @@ package org.apache.tajo.storage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.util.Pair; @@ -42,7 +42,7 @@ public class HashShuffleAppender implements Appender { private TableStats tableStats; //<taskId,<page start offset,<task start, task end>>> - private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes; + private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes; //page start offset, length private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); @@ -69,7 +69,7 @@ public class HashShuffleAppender implements Appender { @Override public void init() throws IOException { currentPage = new Pair(0L, 0); - taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>(); + taskTupleIndexes = new HashMap<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>(); rowNumInPage = 0; } @@ -81,7 +81,7 @@ public class HashShuffleAppender implements Appender { * @return written bytes * @throws java.io.IOException */ - public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException { + public int addTuples(TaskAttemptId taskId, List<Tuple> tuples) throws IOException { synchronized(appender) { if (closed.get()) { return 0; @@ -189,7 +189,7 @@ public class HashShuffleAppender implements Appender { return pages; } - public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() { + public Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() { return taskTupleIndexes; } @@ -203,7 +203,7 @@ public class HashShuffleAppender implements Appender { return merged; } - public void taskFinished(QueryUnitAttemptId taskId) { + public void taskFinished(TaskAttemptId taskId) { taskTupleIndexes.remove(taskId); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java index 636ae0f..74190bc 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; @@ -154,10 +154,10 @@ public class HashShuffleAppenderManager { return intermEntries; } - public void finalizeTask(QueryUnitAttemptId taskId) { + public void finalizeTask(TaskAttemptId taskId) { synchronized (appenderMap) { Map<Integer, PartitionAppenderMeta> partitionAppenderMap = - appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId()); + appenderMap.get(taskId.getTaskId().getExecutionBlockId()); if (partitionAppenderMap == null) { return; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index cb86f35..45e07d3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -25,7 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; @@ -465,7 +465,7 @@ public class RawFile { private TableStatistics stats; - public RawFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + public RawFileAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema, TableMeta meta, Path workDir) throws IOException { super(conf, taskAttemptId, schema, meta, workDir); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java index 8da6ada..5510cbf 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; @@ -314,7 +314,7 @@ public class RowFile { // statistics private TableStatistics stats; - public RowFileAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId, + public RowFileAppender(Configuration conf, final TaskAttemptId taskAttemptId, final Schema schema, final TableMeta meta, final Path workDir) throws IOException { super(conf, taskAttemptId, schema, meta, workDir); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java index dbb8bd0..69399dc 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; @@ -61,7 +61,7 @@ public class AvroAppender extends FileAppender { * @param workDir The path of the Parquet file to write to. */ public AvroAppender(Configuration conf, - QueryUnitAttemptId taskAttemptId, + TaskAttemptId taskAttemptId, org.apache.tajo.catalog.Schema schema, TableMeta meta, Path workDir) throws IOException { super(conf, taskAttemptId, schema, meta, workDir); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java index b10d423..ef5203c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -18,7 +18,7 @@ package org.apache.tajo.storage.parquet; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.storage.StorageConstants; import parquet.hadoop.ParquetOutputFormat; import parquet.hadoop.metadata.CompressionCodecName; @@ -54,7 +54,7 @@ public class ParquetAppender extends FileAppender { * @param meta The table metadata. * @param workDir The path of the Parquet file to write to. */ - public ParquetAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, TableMeta meta, + public ParquetAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema, TableMeta meta, Path workDir) throws IOException { super(conf, taskAttemptId, schema, meta, workDir); this.blockSize = Integer.parseInt( http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index d88223b..2c09100 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -29,7 +29,7 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; @@ -710,7 +710,7 @@ public class RCFile { return out.getPos(); } - public RCFileAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId, + public RCFileAppender(Configuration conf, final TaskAttemptId taskAttemptId, final Schema schema, final TableMeta meta, final Path workDir) throws IOException { super(conf, taskAttemptId, schema, meta, workDir); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java index 14e0f26..8b5d677 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java @@ -29,7 +29,7 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -74,7 +74,7 @@ public class SequenceFileAppender extends FileAppender { private Writable EMPTY_KEY; - public SequenceFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + public SequenceFileAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema, TableMeta meta, Path workDir) throws IOException { super(conf, taskAttemptId, schema, meta, workDir); this.meta = meta; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 8824e3e..59129d1 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -30,14 +30,13 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.storage.*; import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.exception.AlreadyExistsStorageException; -import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; import org.apache.tajo.util.ReflectionUtil; @@ -114,7 +113,7 @@ public class DelimitedTextFile { private NonSyncByteArrayOutputStream os; private TextLineSerializer serializer; - public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + public DelimitedTextFileAppender(Configuration conf, TaskAttemptId taskAttemptId, final Schema schema, final TableMeta meta, final Path path) throws IOException { super(conf, taskAttemptId, schema, meta, path);
