Repository: tajo Updated Branches: refs/heads/branch-0.11.0 5e5c1af08 -> 7c7281de7
TAJO-1825: Remove zero length fragments when file length is zero. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7c7281de Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7c7281de Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7c7281de Branch: refs/heads/branch-0.11.0 Commit: 7c7281de7d7c8e533efa2e619961a261acca95e9 Parents: 5e5c1af Author: Jinho Kim <[email protected]> Authored: Thu Sep 10 10:03:23 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Thu Sep 10 10:03:23 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/client/v2/LegacyClientDelegate.java | 6 ++ .../apache/tajo/querymaster/TestQueryState.java | 88 ++++++++++++++++++++ .../java/org/apache/tajo/querymaster/Stage.java | 8 +- .../main/java/org/apache/tajo/util/JSPUtil.java | 4 + .../org/apache/tajo/storage/FileTablespace.java | 3 - .../apache/tajo/storage/TestFileTablespace.java | 49 +++++++++++ .../org/apache/tajo/storage/TestStorages.java | 44 ++++++++++ 8 files changed, 199 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/7c7281de/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 9bb33bd..e6c9844 100644 --- a/CHANGES +++ b/CHANGES @@ -36,6 +36,8 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1825: Remove zero length fragments when file length is zero. (jinho) + TAJO-1828: tajo-daemon scripts should kill process after process can not stop gracefully. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/7c7281de/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java index 42edadf..0a2c6de 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java @@ -66,6 +66,12 @@ public class LegacyClientDelegate extends SessionConnection implements ClientDel } @Override + public void close() { + executor.shutdown(); + super.close(); + } + + @Override public int executeUpdate(String sql) throws TajoException { queryClient.updateQuery(sql); return 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/7c7281de/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java index a43491b..5175186 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestQueryState.java @@ -20,11 +20,13 @@ package org.apache.tajo.querymaster; import org.apache.tajo.*; import org.apache.tajo.annotation.NotThreadSafe; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.client.QueryStatus; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.util.history.StageHistory; import org.junit.AfterClass; @@ -32,6 +34,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.sql.ResultSet; import java.util.List; import static org.junit.Assert.*; @@ -100,4 +103,89 @@ public class TestQueryState { /* get status from TajoMaster */ assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId).getState()); } + + @Test(timeout = 10000) + public void testEmptyTable() throws Exception { + //create empty table + client.executeQueryAndGetResult("create table lineitem_empty as select * from lineitem where l_orderkey = -1"); + TableStats stats = client.getTableDesc("lineitem_empty").getStats(); + assertEquals(0L, stats.getNumBytes().longValue()); + assertEquals(0L, stats.getNumRows().longValue()); + + String queryStr = "select count(*) from lineitem_empty"; + /* + Optimized master plan + ------------------------------------------------------------------------------- + Execution Block Graph (TERMINAL - eb_1441688509247_0002_000003) + ------------------------------------------------------------------------------- + |-eb_1441688509247_0002_000003 + |-eb_1441688509247_0002_000002 + |-eb_1441688509247_0002_000001 + ------------------------------------------------------------------------------- + Order of Execution + ------------------------------------------------------------------------------- + 1: eb_1441688509247_0002_000001 + 2: eb_1441688509247_0002_000002 + 3: eb_1441688509247_0002_000003 + ------------------------------------------------------------------------------- + + ======================================================= + Block Id: eb_1441688509247_0002_000001 [LEAF] + ======================================================= + + [Outgoing] + [q_1441688509247_0002] 1 => 2 (type=HASH_SHUFFLE, key=, num=1) + + GROUP_BY(5)() + => exprs: (count()) + => target list: ?count_1 (INT8) + => out schema:{(1) ?count_1 (INT8)} + => in schema:{(0) } + SCAN(0) on default.lineitem_empty + => target list: + => out schema: {(0) } + + */ + + ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); + QueryId queryId = new QueryId(res.getQueryId()); + + QueryStatus queryState = client.getQueryStatus(queryId); + while (!TajoClientUtil.isQueryComplete(queryState.getState())) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + fail("Query state : " + queryState); + } + queryState = client.getQueryStatus(queryId); + } + + ResultSet resultSet = client.getQueryResult(queryId); + assertTrue(resultSet.next()); + assertEquals(0, resultSet.getLong(1)); + + QueryInfo queryInfo = cluster.getMaster().getContext().getQueryJobManager().getFinishedQuery(queryId); + assertEquals(queryId, queryInfo.getQueryId()); + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, queryInfo.getQueryState()); + + QueryHistory history = cluster.getQueryHistory(queryId); + List<StageHistory> stages = history.getStageHistories(); + + assertFalse(stages.isEmpty()); + + for (StageHistory stage : stages) { + ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(stage.getExecutionBlockId()); + //find leaf stage + if (executionBlockId.getId() == 1) { + assertEquals(0, stage.getTotalScheduledObjectsCount()); + } else { + assertNotEquals(0, stage.getTotalScheduledObjectsCount()); + } + assertEquals(StageState.SUCCEEDED.toString(), stage.getState()); + } + + /* get status from TajoMaster */ + assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId).getState()); + client.executeQueryAndGetResult("drop table lineitem_empty purge"); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7c7281de/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 0276cc2..1f98457 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -141,6 +141,10 @@ public class Stage implements EventHandler<StageEvent> { .addTransition(StageState.INITED, StageState.INITED, StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(StageState.INITED, + EnumSet.of(StageState.SUCCEEDED, StageState.FAILED), + StageEventType.SQ_STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) .addTransition(StageState.INITED, StageState.KILL_WAIT, StageEventType.SQ_KILL, new KillTasksTransition()) .addTransition(StageState.INITED, StageState.ERROR, @@ -845,8 +849,8 @@ public class Stage implements EventHandler<StageEvent> { LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled"); if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks - stage.finalizeStage(); - stage.complete(); + stage.eventHandler.handle( + new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); } else { if(stage.getSynchronizedState() == StageState.INITED) { stage.taskScheduler.start(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7c7281de/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index 1d93c3c..919b2c2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -287,7 +287,11 @@ public class JSPUtil { } static final DecimalFormat PERCENT_FORMAT = new DecimalFormat("###.#"); + public static String percentFormat(float value) { + if (Float.isInfinite(value) || Float.isNaN(value)) { + value = 0.0f; + } return PERCENT_FORMAT.format(value * 100.0f); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7c7281de/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index f79471a..f4c78b6 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -556,9 +556,6 @@ public class FileTablespace extends Tablespace { splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); } } - } else { - //for zero length files - splits.add(makeSplit(tableName, path, 0, length)); } } if(LOG.isDebugEnabled()){ http://git-wip-us.apache.org/repos/asf/tajo/blob/7c7281de/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java index 37fbfe4..cecd4dd 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java @@ -158,6 +158,55 @@ public class TestFileTablespace { } @Test + public void testZeroLengthSplit() throws Exception { + final Configuration conf = new HdfsConfiguration(); + String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build(); + cluster.waitClusterUp(); + TajoConf tajoConf = new TajoConf(conf); + tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); + + int testCount = 10; + Path tablePath = new Path("/testZeroLengthSplit"); + try { + DistributedFileSystem fs = cluster.getFileSystem(); + + // Create test partitions + List<Path> partitions = Lists.newArrayList(); + for (int i =0; i < testCount; i++){ + Path tmpFile = new Path(tablePath, String.valueOf(i)); + + //creates zero length file + DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 0, (short) 2, 0xDEADDEADl); + partitions.add(tmpFile); + } + + assertTrue(fs.exists(tablePath)); + FileTablespace space = new FileTablespace("testZeroLengthSplit", fs.getUri()); + space.init(new TajoConf(conf)); + assertEquals(fs.getUri(), space.getUri()); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age",Type.INT4); + schema.addColumn("name",Type.TEXT); + TableMeta meta = CatalogUtil.newTableMeta("TEXT"); + + List<Fragment> splits = Lists.newArrayList(); + // Get FileFragments in partition batch + splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()]))); + assertEquals(0, splits.size()); + fs.close(); + } finally { + cluster.shutdown(true); + } + } + + @Test public void testGetSplitWithBlockStorageLocationsBatching() throws Exception { final Configuration conf = new HdfsConfiguration(); String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7c7281de/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index 1051b3b..66d86f2 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -40,6 +40,7 @@ import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatumFactory; import org.apache.tajo.exception.ValueTooLongForTypeCharactersException; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.rcfile.RCFile; import org.apache.tajo.storage.sequencefile.SequenceFileScanner; import org.apache.tajo.util.CommonTestingUtil; @@ -205,6 +206,49 @@ public class TestStorages { } @Test + public void testZeroRows() throws IOException { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("score", Type.FLOAT4); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + meta.setOptions(CatalogUtil.newDefaultProperty(storeType)); + if (storeType.equalsIgnoreCase(BuiltinStorages.AVRO)) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_PROJECTION_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testZeroRows.data"); + FileTablespace sm = TablespaceManager.getLocalFs(); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + appender.close(); + + TableStats stat = appender.getStats(); + assertEquals(0, stat.getNumRows().longValue()); + + if(internalType || BuiltinStorages.TEXT.equals(storeType)) { + FileStatus fileStatus = fs.getFileStatus(tablePath); + assertEquals(0, fileStatus.getLen()); + } + + List<Fragment> splits = sm.getSplits("testZeroRows", meta, schema, testDir); + int tupleCnt = 0; + for (Fragment fragment : splits) { + Scanner scanner = sm.getScanner(meta, schema, fragment, schema); + scanner.init(); + while (scanner.next() != null) { + tupleCnt++; + } + scanner.close(); + } + + assertEquals(0, tupleCnt); + } + + @Test public void testRCFileSplitable() throws IOException { if (storeType.equalsIgnoreCase(BuiltinStorages.RCFILE)) { Schema schema = new Schema();
