http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java new file mode 100644 index 0000000..f3b6936 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/TestJSPUtil.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.TaskId; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent; +import org.apache.tajo.querymaster.Task; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TestJSPUtil { + @Test + public void testSortTask() throws Exception { + List<Task> tasks = new ArrayList<Task>(); + + Configuration conf = new TajoConf(); + + TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext scheduleContext = + new TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext(); + + ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId("eb_000001_00001_00001"); + + for (int i = 0; i < 10; i++) { + TaskId id = new TaskId(ebId, i); + Task task = new Task(conf, scheduleContext, id, true, null); + tasks.add(task); + + int launchTime = i + 1; + int runningTime = i + 1; + if(i < 9) { + task.setLaunchTime(launchTime); + task.setFinishTime(launchTime + runningTime); + } + } + + Collections.shuffle(tasks); + + Task[] taskArray = tasks.toArray(new Task[]{}); + JSPUtil.sortTaskArray(taskArray, "id", "asc"); + for (int i = 0; i < 10; i++) { + assertEquals(i, taskArray[i].getId().getId()); + } + + taskArray = tasks.toArray(new Task[]{}); + JSPUtil.sortTaskArray(taskArray, "id", "desc"); + for (int i = 0; i < 10; i++) { + assertEquals(9 - i, taskArray[i].getId().getId()); + } + + taskArray = tasks.toArray(new Task[]{}); + JSPUtil.sortTaskArray(taskArray, "runTime", "asc"); + assertEquals(0, taskArray[0].getId().getId()); + assertEquals(9, taskArray[9].getId().getId()); + + taskArray = tasks.toArray(new Task[]{}); + JSPUtil.sortTaskArray(taskArray, "runTime", "desc"); + assertEquals(8, taskArray[0].getId().getId()); + assertEquals(9, taskArray[9].getId().getId()); + } + + @Test + public void testGetPageNavigationList() { + List<String> originList = new ArrayList<String>(); + + for (int i = 0; i < 35; i++) { + originList.add("Data" + (i + 1)); + } + + List<String> pageList = JSPUtil.getPageNavigationList(originList, 1, 10); + assertEquals(10, pageList.size()); + assertEquals("Data1", pageList.get(0)); + assertEquals("Data10", pageList.get(9)); + + pageList = JSPUtil.getPageNavigationList(originList, 2, 10); + assertEquals(10, pageList.size()); + assertEquals("Data11", pageList.get(0)); + assertEquals("Data20", pageList.get(9)); + + pageList = JSPUtil.getPageNavigationList(originList, 3, 10); + assertEquals(10, pageList.size()); + assertEquals("Data21", pageList.get(0)); + assertEquals("Data30", pageList.get(9)); + + pageList = JSPUtil.getPageNavigationList(originList, 4, 10); + assertEquals(5, pageList.size()); + assertEquals("Data31", pageList.get(0)); + assertEquals("Data35", pageList.get(4)); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java new file mode 100644 index 0000000..3d2578c --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util.history; + +import com.google.common.io.Files; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.*; +import org.apache.tajo.TajoProtos.QueryState; +import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.util.TajoIdUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import static org.junit.Assert.*; + +public class TestHistoryWriterReader extends QueryTestCaseBase { + public static final String HISTORY_DIR = "/tmp/tajo-test-history"; + TajoConf tajoConf; + + @Before + public void setUp() throws Exception { + tajoConf = new TajoConf(testingCluster.getConfiguration()); + tajoConf.setVar(ConfVars.HISTORY_QUERY_DIR, HISTORY_DIR); + } + + @After + public void tearDown() throws Exception { + Path path = TajoConf.getQueryHistoryDir(tajoConf); + FileSystem fs = path.getFileSystem(tajoConf); + fs.delete(path, true); + } + + @Test + public void testQueryInfoReadAndWrite() throws Exception { + HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true); + try { + writer.init(tajoConf); + writer.start(); + + long startTime = System.currentTimeMillis(); + QueryInfo queryInfo1 = new QueryInfo(QueryIdFactory.newQueryId(startTime, 1)); + queryInfo1.setStartTime(startTime); + queryInfo1.setProgress(1.0f); + queryInfo1.setQueryState(QueryState.QUERY_SUCCEEDED); + writer.appendHistory(queryInfo1); + + QueryInfo queryInfo2 = new QueryInfo(QueryIdFactory.newQueryId(startTime, 2)); + queryInfo2.setStartTime(startTime); + queryInfo2.setProgress(0.5f); + queryInfo2.setQueryState(QueryState.QUERY_FAILED); + writer.appendAndSync(queryInfo2); + + SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd"); + Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR)); + + FileSystem fs = path.getFileSystem(tajoConf); + Path parentPath = new Path(path, df.format(startTime) + "/query-list"); + FileStatus[] histFiles = fs.listStatus(parentPath); + assertNotNull(histFiles); + assertEquals(1, histFiles.length); + assertTrue(histFiles[0].isFile()); + assertTrue(histFiles[0].getPath().getName().endsWith(".hist")); + + HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf); + List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, 2); + assertNotNull(queryInfos); + assertEquals(2, queryInfos.size()); + + QueryInfo foundQueryInfo = queryInfos.get(0); + assertEquals(queryInfo2.getQueryId(), foundQueryInfo.getQueryId()); + assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState()); + assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0); + + foundQueryInfo = reader.getQueryByQueryId(queryInfo2.getQueryId()); + assertEquals(queryInfo2.getQueryId(), foundQueryInfo.getQueryId()); + assertEquals(queryInfo2.getQueryState(), foundQueryInfo.getQueryState()); + assertEquals(queryInfo2.getProgress(), foundQueryInfo.getProgress(), 0); + + foundQueryInfo = queryInfos.get(1); + assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId()); + assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState()); + assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0); + + foundQueryInfo = reader.getQueryByQueryId(queryInfo1.getQueryId()); + assertEquals(queryInfo1.getQueryId(), foundQueryInfo.getQueryId()); + assertEquals(queryInfo1.getQueryState(), foundQueryInfo.getQueryState()); + assertEquals(queryInfo1.getProgress(), foundQueryInfo.getProgress(), 0); + } finally { + writer.stop(); + } + } + + @Test + public void testQueryInfoPagination() throws Exception { + HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true); + try { + writer.init(tajoConf); + writer.start(); + + long startTime = System.currentTimeMillis(); + int testSize = 10; + QueryInfo queryInfo; + + for (int i = 1; i < testSize + 1; i++) { + queryInfo = new QueryInfo(QueryIdFactory.newQueryId(startTime, i)); + queryInfo.setStartTime(startTime); + queryInfo.setProgress(1.0f); + queryInfo.setQueryState(QueryState.QUERY_SUCCEEDED); + + if (testSize == i) { + writer.appendAndSync(queryInfo); + } else { + writer.appendHistory(queryInfo); + } + } + + SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd"); + Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR)); + + FileSystem fs = path.getFileSystem(tajoConf); + Path parentPath = new Path(path, df.format(startTime) + "/query-list"); + FileStatus[] histFiles = fs.listStatus(parentPath); + assertNotNull(histFiles); + assertEquals(1, histFiles.length); + assertTrue(histFiles[0].isFile()); + assertTrue(histFiles[0].getPath().getName().endsWith(".hist")); + + HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf); + List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, testSize); + assertNotNull(queryInfos); + assertEquals(testSize, queryInfos.size()); + + // the pagination api returns a descending ordered list + for (int i = 0; i < testSize; i++) { + assertEquals(testSize - i, queryInfos.get(i).getQueryId().getSeq()); + } + + int pages = 5; + int pageSize = testSize / pages; + int expectIdSequence = testSize; + //min startIndex of page is 1 + for (int i = 1; i < pages + 1; i++) { + queryInfos = reader.getQueriesInHistory(i, pageSize); + assertNotNull(queryInfos); + assertEquals(pageSize, queryInfos.size()); + + for (QueryInfo qInfo : queryInfos) { + assertEquals(expectIdSequence--, qInfo.getQueryId().getSeq()); + } + } + } finally { + writer.stop(); + } + } + + @Test + public void testQueryHistoryReadAndWrite() throws Exception { + HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true); + writer.init(tajoConf); + writer.start(); + + try { + long startTime = System.currentTimeMillis(); + + QueryHistory queryHistory = new QueryHistory(); + QueryId queryId = QueryIdFactory.newQueryId(startTime, 1); + queryHistory.setQueryId(queryId.toString()); + queryHistory.setLogicalPlan("LogicalPlan"); + List<StageHistory> stages = new ArrayList<StageHistory>(); + for (int i = 0; i < 3; i++) { + ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId, i); + StageHistory stageHistory = new StageHistory(); + stageHistory.setExecutionBlockId(ebId.toString()); + stageHistory.setStartTime(startTime + i); + + List<TaskHistory> taskHistories = new ArrayList<TaskHistory>(); + for (int j = 0; j < 5; j++) { + TaskHistory taskHistory = new TaskHistory(); + taskHistory.setId(QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId), 1).toString()); + taskHistories.add(taskHistory); + } + stageHistory.setTasks(taskHistories); + stages.add(stageHistory); + } + queryHistory.setStageHistories(stages); + + writer.appendAndSync(queryHistory); + + SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd"); + Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR)); + + FileSystem fs = path.getFileSystem(tajoConf); + + assertTrue(fs.exists(new Path(path, + df.format(startTime) + "/query-detail/" + queryId.toString() + "/query.hist"))); + for (int i = 0; i < 3; i++) { + String ebId = QueryIdFactory.newExecutionBlockId(queryId, i).toString(); + assertTrue(fs.exists(new Path(path, + df.format(startTime) + "/query-detail/" + queryId.toString() + "/" + ebId + ".hist"))); + } + + HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf); + QueryHistory foundQueryHistory = reader.getQueryHistory(queryId.toString()); + assertNotNull(foundQueryHistory); + assertEquals(queryId.toString(), foundQueryHistory.getQueryId()); + assertEquals(3, foundQueryHistory.getStageHistories().size()); + + for (int i = 0; i < 3; i++) { + String ebId = QueryIdFactory.newExecutionBlockId(queryId, i).toString(); + StageHistory stageHistory = foundQueryHistory.getStageHistories().get(i); + assertEquals(ebId, stageHistory.getExecutionBlockId()); + assertEquals(startTime + i, stageHistory.getStartTime()); + + // TaskHistory is stored in the other file. + assertNull(stageHistory.getTasks()); + + List<TaskHistory> tasks = reader.getTaskHistory(queryId.toString(), ebId); + assertNotNull(tasks); + assertEquals(5, tasks.size()); + + for (int j = 0; j < 5; j++) { + TaskHistory taskHistory = tasks.get(j); + assertEquals(stages.get(i).getTasks().get(j).getId(), taskHistory.getId()); + } + } + } finally { + writer.stop(); + } + } + + @Test + public void testTaskHistoryReadAndWrite() throws Exception { + TajoConf tajoConf = new TajoConf(); + File historyParentDir = Files.createTempDir(); + historyParentDir.deleteOnExit(); + tajoConf.setVar(ConfVars.HISTORY_TASK_DIR, "file://" + historyParentDir.getCanonicalPath()); + + HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", false); + writer.init(tajoConf); + writer.start(); + + try { + // Write TaskHistory + TableStatsProto tableStats = TableStatsProto.newBuilder() + .setNumRows(10) + .setNumBytes(100) + .build(); + long startTime = System.currentTimeMillis() - 2000; + TaskAttemptId id1 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000001_00"); + org.apache.tajo.worker.TaskHistory taskHistory1 = new org.apache.tajo.worker.TaskHistory( + id1, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis(), tableStats); + writer.appendHistory(taskHistory1); + + TaskAttemptId id2 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000002_00"); + org.apache.tajo.worker.TaskHistory taskHistory2 = new org.apache.tajo.worker.TaskHistory( + id2, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis() - 500, tableStats); + writer.appendAndSync(taskHistory2); + + SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); + String startDate = df.format(new Date(startTime)); + Path taskParentPath = new Path(tajoConf.getVar(ConfVars.HISTORY_TASK_DIR), + startDate.substring(0, 8) + "/tasks/127.0.0.1_28090"); + + FileSystem fs = taskParentPath.getFileSystem(tajoConf); + assertTrue(fs.exists(taskParentPath)); + + HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf); + org.apache.tajo.worker.TaskHistory foundTaskHistory = reader.getTaskHistory(id1.toString(), startTime); + assertNotNull(foundTaskHistory); + assertEquals(id1, foundTaskHistory.getTaskAttemptId()); + assertEquals(taskHistory1, foundTaskHistory); + + foundTaskHistory = reader.getTaskHistory(id2.toString(), startTime); + assertNotNull(foundTaskHistory); + assertEquals(id2, foundTaskHistory.getTaskAttemptId()); + assertEquals(taskHistory2, foundTaskHistory); + + foundTaskHistory = reader.getTaskHistory("ta_1412326813565_0001_000001_000003_00", startTime); + assertNull(foundTaskHistory); + } finally { + writer.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java new file mode 100644 index 0000000..b70512c --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util.metrics; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public class TestMetricsFilter { + @Test + public void testGroupNameMetricsFilter() { + GroupNameMetricsFilter filter = new GroupNameMetricsFilter("tajomaster"); + + assertTrue(filter.matches("tajomaster.JVM.Heap.memFree", null)); + assertTrue(!filter.matches("tajomaster01.JVM.Heap.memFree", null)); + assertTrue(!filter.matches("server.tajomaster.JVM.Heap.memFree", null)); + assertTrue(!filter.matches("tajworker.JVM.Heap.memFree", null)); + } + + @Test + public void testRegexpMetricsFilter() { + List<String> filterExpressions = new ArrayList<String>(); + filterExpressions.add("JVM"); + filterExpressions.add("Query"); + + RegexpMetricsFilter filter = new RegexpMetricsFilter(filterExpressions); + + assertTrue(filter.matches("tajomaster.JVM.Heap.memFree", null)); + assertTrue(filter.matches("tajomaster.Query.numQuery", null)); + + assertTrue(!filter.matches("tajomaster.resource.numWorker", null)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java new file mode 100644 index 0000000..8751df9 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util.metrics; + +import com.codahale.metrics.Counter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.metrics.Master; +import org.apache.tajo.metrics.MetricsUtil; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.metrics.reporter.TajoMetricsScheduledReporter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStreamReader; +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestSystemMetrics { + Path testPropertyFile; + Path metricsOutputFile; + @Before + public void setUp() throws Exception { + testPropertyFile = + new Path(CommonTestingUtil.getTestDir(), System.currentTimeMillis() + ".properties"); + + metricsOutputFile = + new Path(CommonTestingUtil.getTestDir(), System.currentTimeMillis() + ".out"); + + FileOutputStream out = new FileOutputStream(testPropertyFile.toUri().getPath()); + out.write("reporter.null=org.apache.tajo.util.metrics.reporter.NullReporter\n".getBytes()); + out.write("reporter.file=org.apache.tajo.util.metrics.reporter.MetricsFileScheduledReporter\n".getBytes()); + out.write("reporter.console=org.apache.tajo.util.metrics.reporter.MetricsConsoleScheduledReporter\n".getBytes()); + + out.write("MASTER-JVM.reporters=console\n".getBytes()); + out.write("MASTER.reporters=file\n".getBytes()); + out.write("test-console-group.reporters=console\n".getBytes()); + out.write("test-find-console-group.reporters=console,file\n".getBytes()); + + out.write(("MASTER.file.filename=" + metricsOutputFile.toUri().getPath() + "\n").getBytes()); + out.write("MASTER.file.period=5\n".getBytes()); + out.close(); + } + + @Test + public void testMetricsReporter() throws Exception { + TajoConf tajoConf = new TajoConf(); + tajoConf.set("tajo.metrics.property.file", testPropertyFile.toUri().getPath()); + TajoSystemMetrics tajoSystemMetrics = new TajoSystemMetrics(tajoConf, org.apache.tajo.metrics.Master.class, + "localhost"); + tajoSystemMetrics.start(); + + Collection<TajoMetricsScheduledReporter> reporters = tajoSystemMetrics.getMetricsReporters(); + + assertEquals(2, reporters.size()); + + TajoMetricsScheduledReporter reporter = reporters.iterator().next(); + assertEquals(5, reporter.getPeriod()); + + for(int i = 0; i < 10; i++) { + tajoSystemMetrics.counter(Master.Query.FAILED).inc(); + tajoSystemMetrics.counter(Master.Query.COMPLETED).inc(2); + tajoSystemMetrics.counter(Master.Cluster.ACTIVE_NODES).inc(3); + } + + SortedMap<String, Counter> counterMap = tajoSystemMetrics.getRegistry().getCounters(); + Counter counter1 = counterMap.get("MASTER.QUERY.FAILED"); + assertNotNull(counter1); + assertEquals(10, counter1.getCount()); + + Counter counter2 = counterMap.get("MASTER.QUERY.COMPLETED"); + assertNotNull(counter2); + assertEquals(20, counter2.getCount()); + + Counter counter3 = counterMap.get("MASTER.CLUSTER.ACTIVE_NODES"); + assertNotNull(counter3); + assertEquals(30, counter3.getCount()); + + //test findMetricsItemGroup method + Map<String, Map<String, Counter>> groupItems = reporter.findMetricsItemGroup(counterMap); + assertEquals(2, groupItems.size()); + + Map<String, Counter> group01Items = groupItems.get(MetricsUtil.getCanonicalContextName(Master.Query.class)); + assertEquals(2, group01Items.size()); + + counter1 = group01Items.get(Master.Query.FAILED.name()); + assertNotNull(counter1); + assertEquals(10, counter1.getCount()); + + counter2 = group01Items.get(Master.Query.COMPLETED.name()); + assertNotNull(counter2); + assertEquals(20, counter2.getCount()); + + Map<String, Counter> group02Items = groupItems.get(MetricsUtil.getCanonicalContextName(Master.Cluster.class)); + assertEquals(1, group02Items.size()); + + reporter.report(); + + BufferedReader reader = new BufferedReader(new InputStreamReader( + new FileInputStream(metricsOutputFile.toUri().getPath()))); + + String line; + + List<String> lines = new ArrayList<String>(); + while((line = reader.readLine()) != null) { + lines.add(line); + } + + assertEquals(2, lines.size()); + tajoSystemMetrics.stop(); + } + + @After + public void tearDown() throws Exception { + FileSystem fs = testPropertyFile.getFileSystem(new Configuration()); + fs.delete(testPropertyFile, false); + fs.delete(metricsOutputFile, false); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java new file mode 100644 index 0000000..7d7fb1a --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse; +import org.apache.tajo.TaskAttemptId; + +import java.io.IOException; + +public class MockExecutionBlock extends ExecutionBlockContext { + + public MockExecutionBlock(TajoWorker.WorkerContext workerContext, + ExecutionBlockContextResponse request) throws IOException { + super(workerContext, request, null); + } + + @Override + public void init() throws Throwable { + //skip + } + + @Override + public void fatalError(TaskAttemptId taskAttemptId, String message) { + + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java new file mode 100644 index 0000000..8c8427d --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.ResourceProtos.TaskAllocationProto; +import org.apache.tajo.ResourceProtos.TaskRequestProto; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.worker.event.NodeResourceEvent; + +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; + +public class MockNodeResourceManager extends NodeResourceManager { + + volatile boolean enableTaskHandlerEvent = true; + private final Semaphore barrier; + + public MockNodeResourceManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) { + super(dispatcher, workerContext); + this.barrier = barrier; + } + + @Override + public void handle(NodeResourceEvent event) { + super.handle(event); + barrier.release(); + } + + @Override + protected void startTask(TaskRequestProto request, NodeResource resource) { + if(enableTaskHandlerEvent) { + super.startTask(request, resource); + } + } + + /** + * skip task execution and deallocation for testing + * */ + public void setTaskHandlerEvent(boolean flag) { + enableTaskHandlerEvent = flag; + } + + protected static Queue<TaskAllocationProto> createTaskRequests( + ExecutionBlockId ebId, int memory, int size) { + + Queue<TaskAllocationProto> + requestProtoList = new LinkedBlockingQueue<TaskAllocationProto>(); + for (int i = 0; i < size; i++) { + + TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId, i), 0); + TaskRequestProto.Builder builder = TaskRequestProto.newBuilder(); + builder.setQueryMasterHostAndPort("localhost:0"); + builder.setId(taskAttemptId.getProto()); + builder.setOutputTable(""); + builder.setPlan(PlanProto.LogicalNodeTree.newBuilder()); + builder.setClusteredOutput(false); + + + requestProtoList.add(TaskAllocationProto.newBuilder() + .setResource(NodeResources.createResource(memory).getProto()) + .setTaskRequest(builder.build()).build()); + } + return requestProtoList; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java new file mode 100644 index 0000000..634398f --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Maps; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; + +import java.net.ConnectException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.apache.tajo.ResourceProtos.*; + +public class MockNodeStatusUpdater extends NodeStatusUpdater { + + private CountDownLatch barrier; + private Map<Integer, NodeResource> membership = Maps.newHashMap(); + private Map<Integer, NodeResource> resources = Maps.newHashMap(); + private MockResourceTracker resourceTracker; + + public MockNodeStatusUpdater(CountDownLatch barrier, TajoWorker.WorkerContext workerContext) { + super(workerContext); + this.barrier = barrier; + this.resourceTracker = new MockResourceTracker(); + } + + @Override + protected TajoResourceTrackerProtocolService.Interface newStub() + throws NoSuchMethodException, ConnectException, ClassNotFoundException { + + return resourceTracker; + } + + protected MockResourceTracker getResourceTracker() { + return resourceTracker; + } + + class MockResourceTracker implements TajoResourceTrackerProtocolService.Interface { + private NodeHeartbeatRequest lastRequest; + + protected Map<Integer, NodeResource> getTotalResource() { + return membership; + } + + protected Map<Integer, NodeResource> getAvailableResource() { + return membership; + } + + protected NodeHeartbeatRequest getLastRequest() { + return lastRequest; + } + + @Override + public void nodeHeartbeat(RpcController controller, NodeHeartbeatRequest request, + RpcCallback<NodeHeartbeatResponse> done) { + + NodeHeartbeatResponse.Builder response = NodeHeartbeatResponse.newBuilder(); + if (membership.containsKey(request.getWorkerId())) { + if (request.hasAvailableResource()) { + NodeResource resource = resources.get(request.getWorkerId()); + NodeResources.update(resource, new NodeResource(request.getAvailableResource())); + } + done.run(response.setCommand(ResponseCommand.NORMAL).build()); + } else { + if (request.hasConnectionInfo()) { + membership.put(request.getWorkerId(), new NodeResource(request.getTotalResource())); + resources.put(request.getWorkerId(), new NodeResource(request.getAvailableResource())); + done.run(response.setCommand(ResponseCommand.NORMAL).build()); + } else { + done.run(response.setCommand(ResponseCommand.MEMBERSHIP).build()); + } + } + lastRequest = request; + barrier.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java new file mode 100644 index 0000000..071d26a --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.tajo.ResourceProtos.TaskRequestProto; +import org.apache.tajo.ResourceProtos.TaskStatusProto; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.worker.event.TaskStartEvent; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; + +public class MockTaskExecutor extends TaskExecutor { + + protected final Semaphore barrier; + + public MockTaskExecutor(Semaphore barrier, TajoWorker.WorkerContext workerContext) { + super(workerContext); + this.barrier = barrier; + } + + @Override + public void handle(TaskStartEvent event) { + super.handle(event); + barrier.release(); + } + + @Override + protected Task createTask(final ExecutionBlockContext context, TaskRequestProto taskRequest) { + final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId()); + + //ignore status changed log + final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(null, context, taskAttemptId, null, null) { + private TajoProtos.TaskAttemptState state; + + @Override + public TajoProtos.TaskAttemptState getState() { + return state; + } + + @Override + public void setState(TajoProtos.TaskAttemptState state) { + this.state = state; + } + }; + + return new Task() { + @Override + public void init() throws IOException { + + } + + @Override + public void fetch(ExecutorService executorService) { + + } + + @Override + public void run() throws Exception { + taskAttemptContext.stop(); + taskAttemptContext.setProgress(1.0f); + taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED); + } + + @Override + public void kill() { + + } + + @Override + public void abort() { + + } + + @Override + public void cleanup() { + + } + + @Override + public boolean hasFetchPhase() { + return false; + } + + @Override + public boolean isProgressChanged() { + return false; + } + + @Override + public boolean isStopped() { + return taskAttemptContext.isStopped(); + } + + @Override + public void updateProgress() { + + } + + @Override + public TaskAttemptContext getTaskContext() { + return taskAttemptContext; + } + + @Override + public ExecutionBlockContext getExecutionBlockContext() { + return context; + } + + @Override + public TaskStatusProto getReport() { + TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); + builder.setWorkerName("localhost:0"); + builder.setId(taskAttemptContext.getTaskId().getProto()) + .setProgress(taskAttemptContext.getProgress()) + .setState(taskAttemptContext.getState()); + + builder.setInputStats(new TableStats().getProto()); + return builder.build(); + } + + @Override + public TaskHistory createTaskHistory() { + return null; + } + + @Override + public List<Fetcher> getFetchers() { + return null; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java new file mode 100644 index 0000000..76ce9f7 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse; +import org.apache.tajo.ResourceProtos.ExecutionBlockListProto; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.worker.event.TaskManagerEvent; + +import java.io.IOException; +import java.util.concurrent.Semaphore; + +public class MockTaskManager extends TaskManager { + + private final Semaphore barrier; + + public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) { + super(dispatcher, workerContext); + this.barrier = barrier; + } + + @Override + protected ExecutionBlockContext createExecutionBlock(ExecutionBlockId executionBlockId, String queryMaster) { + try { + ExecutionBlockContextResponse.Builder builder = ExecutionBlockContextResponse.newBuilder(); + builder.setExecutionBlockId(executionBlockId.getProto()) + .setPlanJson("test") + .setQueryContext(new QueryContext(new TajoConf()).getProto()) + .setQueryOutputPath("testpath") + .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE); + return new MockExecutionBlock(getWorkerContext(), builder.build()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void stopExecutionBlock(ExecutionBlockContext context, + ExecutionBlockListProto cleanupList) { + //skip for testing + } + + @Override + public void handle(TaskManagerEvent event) { + super.handle(event); + barrier.release(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java new file mode 100644 index 0000000..25f3dca --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockWorkerContext.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.querymaster.QueryMaster; +import org.apache.tajo.querymaster.QueryMasterManagerService; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.storage.HashShuffleAppenderManager; +import org.apache.tajo.util.history.HistoryReader; +import org.apache.tajo.util.history.HistoryWriter; +import org.apache.tajo.util.metrics.TajoSystemMetrics; + +public abstract class MockWorkerContext implements TajoWorker.WorkerContext { + TajoSystemMetrics tajoSystemMetrics; + + @Override + public QueryMaster getQueryMaster() { + return null; + } + + public abstract TajoConf getConf(); + + @Override + public ServiceTracker getServiceTracker() { + return null; + } + + @Override + public QueryMasterManagerService getQueryMasterManagerService() { + return null; + } + + @Override + public CatalogService getCatalog() { + return null; + } + + @Override + public WorkerConnectionInfo getConnectionInfo() { + return null; + } + + @Override + public String getWorkerName() { + return null; + } + + @Override + public LocalDirAllocator getLocalDirAllocator() { + return null; + } + + @Override + public TajoSystemMetrics getMetrics() { + + if (tajoSystemMetrics == null) { + tajoSystemMetrics = new TajoSystemMetrics(getConf(), org.apache.tajo.metrics.Node.class, "localhost"); + tajoSystemMetrics.start(); + } + return tajoSystemMetrics; + } + + @Override + public HashShuffleAppenderManager getHashShuffleAppenderManager() { + return null; + } + + @Override + public HistoryWriter getTaskHistoryWriter() { + return null; + } + + @Override + public HistoryReader getHistoryReader() { + return null; + } + + @Override + public void cleanup(String strPath) { + + } + + @Override + public void cleanupTemporalDirectories() { + + } +} + http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java new file mode 100644 index 0000000..98251c1 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestDeletionService.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestDeletionService { + DeletionService deletionService; + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() { + if(deletionService != null){ + deletionService.stop(); + } + } + + @Test + public final void testTemporalDirectory() throws IOException, InterruptedException { + int delay = 1; + deletionService = new DeletionService(1, delay); + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path tempPath = CommonTestingUtil.getTestDir(); + assertTrue(fs.exists(tempPath)); + deletionService.delete(tempPath); + assertTrue(fs.exists(tempPath)); + + Thread.sleep(delay * 2 * 1000); + assertFalse(fs.exists(tempPath)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java new file mode 100644 index 0000000..a91fc30 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.fs.*; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.pullserver.TajoPullServerService; +import org.apache.tajo.pullserver.retriever.FileChunk; +import org.apache.tajo.storage.HashShuffleAppenderManager; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.*; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.Random; + +import static org.junit.Assert.*; + +public class TestFetcher { + private String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFetcher"; + private String INPUT_DIR = TEST_DATA+"/in/"; + private String OUTPUT_DIR = TEST_DATA+"/out/"; + private TajoConf conf = new TajoConf(); + private TajoPullServerService pullServerService; + + @Before + public void setUp() throws Exception { + CommonTestingUtil.getTestDir(TEST_DATA); + CommonTestingUtil.getTestDir(INPUT_DIR); + CommonTestingUtil.getTestDir(OUTPUT_DIR); + conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR); + conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1); + conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127); + + pullServerService = new TajoPullServerService(); + pullServerService.init(conf); + pullServerService.start(); + } + + @After + public void tearDown(){ + pullServerService.stop(); + } + + @Test + public void testGet() throws IOException { + Random rnd = new Random(); + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String partId = "1"; + + int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf); + String dataPath = conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + + queryId.toString() + "/output/" + sid + "/hash-shuffle/" + partParentId + "/" + partId; + + String params = String.format("qid=%s&sid=%s&p=%s&type=%s", queryId, sid, partId, "h"); + + Path inputPath = new Path(dataPath); + FSDataOutputStream stream = FileSystem.getLocal(conf).create(inputPath, true); + for (int i = 0; i < 100; i++) { + String data = ""+rnd.nextInt(); + stream.write(data.getBytes()); + } + stream.flush(); + stream.close(); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); + storeChunk.setFromRemote(true); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk); + FileChunk chunk = fetcher.get(); + assertNotNull(chunk); + assertNotNull(chunk.getFile()); + + FileSystem fs = FileSystem.getLocal(new TajoConf()); + FileStatus inStatus = fs.getFileStatus(inputPath); + FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data")); + + assertEquals(inStatus.getLen(), outStatus.getLen()); + assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState()); + } + + @Test + public void testAdjustFetchProcess() { + assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0); + assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0); + assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0); + assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0); + assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0); + assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0); + assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0); + } + + @Test + public void testStatus() throws Exception { + Random rnd = new Random(); + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); + + FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath), true); + for (int i = 0; i < 100; i++) { + String data = ""+rnd.nextInt(); + stream.write(data.getBytes()); + } + stream.flush(); + stream.close(); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); + storeChunk.setFromRemote(true); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk); + assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); + + fetcher.get(); + assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState()); + } + + @Test + public void testNoContentFetch() throws Exception { + + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); + + Path inputPath = new Path(dataPath); + FileSystem fs = FileSystem.getLocal(conf); + if(fs.exists(inputPath)){ + fs.delete(new Path(dataPath), true); + } + + FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath).getParent(), true); + stream.close(); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); + storeChunk.setFromRemote(true); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk); + assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); + + fetcher.get(); + assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState()); + } + + @Test + public void testFailureStatus() throws Exception { + Random rnd = new Random(); + + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + + //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type + String shuffleType = "x"; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, shuffleType, ta); + + FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath), true); + + for (int i = 0; i < 100; i++) { + String data = params + rnd.nextInt(); + stream.write(data.getBytes()); + } + stream.flush(); + stream.close(); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); + storeChunk.setFromRemote(true); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk); + assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); + + fetcher.get(); + assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState()); + } + + @Test + public void testServerFailure() throws Exception { + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); + storeChunk.setFromRemote(true); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk); + assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); + + pullServerService.stop(); + + boolean failure = false; + try{ + fetcher.get(); + } catch (Throwable e){ + failure = true; + } + assertTrue(failure); + assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java new file mode 100644 index 0000000..1193478 --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.event.NodeResourceAllocateEvent; +import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; +import org.apache.tajo.worker.event.NodeResourceEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.tajo.ResourceProtos.*; +import static org.junit.Assert.*; +public class TestNodeResourceManager { + + private MockNodeResourceManager resourceManager; + private NodeStatusUpdater statusUpdater; + private TaskManager taskManager; + private TaskExecutor taskExecutor; + private AsyncDispatcher dispatcher; + private AsyncDispatcher taskDispatcher; + private TajoWorker.WorkerContext workerContext; + + private CompositeService service; + private int taskMemory; + private TajoConf conf; + + @Before + public void setup() { + conf = new TajoConf(); + conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + + taskMemory = 512; + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB, + taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES)); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS, 4); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1); + conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2); + + dispatcher = new AsyncDispatcher(); + taskDispatcher = new AsyncDispatcher(); + + workerContext = new MockWorkerContext() { + WorkerConnectionInfo workerConnectionInfo; + @Override + public TajoConf getConf() { + return conf; + } + + @Override + public TaskManager getTaskManager() { + return taskManager; + } + + @Override + public TaskExecutor getTaskExecuor() { + return taskExecutor; + } + + @Override + public NodeResourceManager getNodeResourceManager() { + return resourceManager; + } + + @Override + public WorkerConnectionInfo getConnectionInfo() { + if (workerConnectionInfo == null) { + workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + } + return workerConnectionInfo; + } + }; + + taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext); + taskExecutor = new MockTaskExecutor(new Semaphore(0), workerContext); + resourceManager = new MockNodeResourceManager(new Semaphore(0), dispatcher, workerContext); + statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext); + + service = new CompositeService("MockService") { + @Override + protected void serviceInit(Configuration conf) throws Exception { + addIfService(dispatcher); + addIfService(taskDispatcher); + addIfService(taskManager); + addIfService(taskExecutor); + addIfService(resourceManager); + addIfService(statusUpdater); + super.serviceInit(conf); + } + + @Override + protected void serviceStop() throws Exception { + workerContext.getMetrics().stop(); + super.serviceStop(); + } + }; + + service.init(conf); + service.start(); + } + + @After + public void tearDown() { + service.stop(); + } + + @Test + public void testNodeResourceAllocateEvent() throws Exception { + int requestSize = 4; + resourceManager.setTaskHandlerEvent(false); //skip task execution + + CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>(); + BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder(); + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + requestProto.setExecutionBlockId(ebId.getProto()); + + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize)); + + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + + BatchAllocationResponse responseProto = callFuture.get(); + assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + // allocated all + assertEquals(0, responseProto.getCancellationTaskCount()); + } + + + @Test + public void testNodeResourceCancellation() throws Exception { + int requestSize = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); + int overSize = 10; + resourceManager.setTaskHandlerEvent(false); //skip task execution + + CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>(); + BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder(); + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + requestProto.setExecutionBlockId(ebId.getProto()); + + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + requestProto.addAllTaskRequest( + MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize + overSize)); + + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + BatchAllocationResponse responseProto = callFuture.get(); + + assertEquals(overSize, responseProto.getCancellationTaskCount()); + } + + @Test + public void testNodeResourceDeallocateEvent() throws Exception { + int requestSize = 4; + resourceManager.setTaskHandlerEvent(false); //skip task execution + + CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>(); + BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder(); + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + requestProto.setExecutionBlockId(ebId.getProto()); + + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize)); + + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + + BatchAllocationResponse responseProto = callFuture.get(); + assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + assertEquals(0, responseProto.getCancellationTaskCount()); + + //deallocate + for(TaskAllocationProto allocationRequestProto : requestProto.getTaskRequestList()) { + // direct invoke handler for testing + resourceManager.handle(new NodeResourceDeallocateEvent( + allocationRequestProto.getResource(), NodeResourceEvent.ResourceType.TASK)); + } + + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); + } + + @Test(timeout = 30000) + public void testParallelRequest() throws Exception { + final int parallelCount = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2; + final int taskSize = 100000; + resourceManager.setTaskHandlerEvent(true); + + final AtomicInteger totalComplete = new AtomicInteger(); + final AtomicInteger totalCanceled = new AtomicInteger(); + + final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + final Queue<TaskAllocationProto> + totalTasks = MockNodeResourceManager.createTaskRequests(ebId, taskMemory, taskSize); + + + TaskAllocationProto task = totalTasks.poll(); + BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder(); + requestProto.addTaskRequest(task); + requestProto.setExecutionBlockId(ebId.getProto()); + CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>(); + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + assertTrue(callFuture.get().getCancellationTaskCount() == 0); + totalComplete.incrementAndGet(); + + // start parallel request + ExecutorService executor = Executors.newFixedThreadPool(parallelCount); + + List<Future> futureList = Lists.newArrayList(); + + long startTime = System.currentTimeMillis(); + for (int i = 0; i < parallelCount; i++) { + futureList.add(executor.submit(new Runnable() { + @Override + public void run() { + int complete = 0; + while (true) { + TaskAllocationProto task = totalTasks.poll(); + if (task == null) break; + + + BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder(); + requestProto.addTaskRequest(task); + requestProto.setExecutionBlockId(ebId.getProto()); + + CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>(); + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + try { + BatchAllocationResponse proto = callFuture.get(); + if (proto.getCancellationTaskCount() > 0) { + totalTasks.addAll(proto.getCancellationTaskList()); + totalCanceled.addAndGet(proto.getCancellationTaskCount()); + } else { + complete++; + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + totalComplete.addAndGet(complete); + } + }) + ); + } + + for (Future future : futureList) { + future.get(); + } + + executor.shutdown(); + assertEquals(taskSize, totalComplete.get()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java new file mode 100644 index 0000000..ac4b7dd --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.event.NodeStatusEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; + +import static org.apache.tajo.ResourceProtos.NodeHeartbeatRequest; +import static org.junit.Assert.*; + +public class TestNodeStatusUpdater { + + private NodeResourceManager resourceManager; + private MockNodeStatusUpdater statusUpdater; + private MockTaskManager taskManager; + private AsyncDispatcher dispatcher; + private AsyncDispatcher taskDispatcher; + private CompositeService service; + private TajoConf conf; + private TajoWorker.WorkerContext workerContext; + + + @Before + public void setup() { + conf = new TajoConf(); + conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 2); + conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2); + + workerContext = new MockWorkerContext() { + WorkerConnectionInfo workerConnectionInfo; + + @Override + public TajoConf getConf() { + return conf; + } + + @Override + public TaskManager getTaskManager() { + return taskManager; + } + + @Override + public TaskExecutor getTaskExecuor() { + return null; + } + + @Override + public NodeResourceManager getNodeResourceManager() { + return resourceManager; + } + + @Override + public WorkerConnectionInfo getConnectionInfo() { + if (workerConnectionInfo == null) { + workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + } + return workerConnectionInfo; + } + }; + + conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL, 1000); + dispatcher = new AsyncDispatcher(); + resourceManager = new NodeResourceManager(dispatcher, workerContext); + taskDispatcher = new AsyncDispatcher(); + taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext) { + @Override + public int getRunningTasks() { + return 0; + } + }; + + service = new CompositeService("MockService") { + @Override + protected void serviceInit(Configuration conf) throws Exception { + addIfService(dispatcher); + addIfService(taskDispatcher); + addIfService(taskManager); + addIfService(resourceManager); + addIfService(statusUpdater); + super.serviceInit(conf); + } + + @Override + protected void serviceStop() throws Exception { + workerContext.getMetrics().stop(); + super.serviceStop(); + } + }; + + service.init(conf); + service.start(); + } + + @After + public void tearDown() { + service.stop(); + } + + @Test(timeout = 20000) + public void testNodeMembership() throws Exception { + CountDownLatch barrier = new CountDownLatch(1); + statusUpdater = new MockNodeStatusUpdater(barrier, workerContext); + statusUpdater.init(conf); + statusUpdater.start(); + + MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker(); + barrier.await(); + + assertTrue(resourceTracker.getTotalResource().containsKey(workerContext.getConnectionInfo().getId())); + assertEquals(resourceManager.getTotalResource(), + resourceTracker.getTotalResource().get(workerContext.getConnectionInfo().getId())); + + assertEquals(resourceManager.getAvailableResource(), + resourceTracker.getAvailableResource().get(workerContext.getConnectionInfo().getId())); + } + + @Test(timeout = 20000) + public void testPing() throws Exception { + CountDownLatch barrier = new CountDownLatch(2); + statusUpdater = new MockNodeStatusUpdater(barrier, workerContext); + statusUpdater.init(conf); + statusUpdater.start(); + + MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker(); + barrier.await(); + + NodeHeartbeatRequest lastRequest = resourceTracker.getLastRequest(); + assertTrue(lastRequest.hasWorkerId()); + assertTrue(lastRequest.hasAvailableResource()); + assertTrue(lastRequest.hasRunningTasks()); + assertTrue(lastRequest.hasRunningQueryMasters()); + assertFalse(lastRequest.hasTotalResource()); + assertFalse(lastRequest.hasConnectionInfo()); + } + + @Test(timeout = 20000) + public void testResourceReport() throws Exception { + CountDownLatch barrier = new CountDownLatch(2); + statusUpdater = new MockNodeStatusUpdater(barrier, workerContext); + statusUpdater.init(conf); + statusUpdater.start(); + + assertEquals(0, statusUpdater.getQueueSize()); + for (int i = 0; i < statusUpdater.getQueueingThreshold(); i++) { + dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE)); + } + barrier.await(); + assertEquals(0, statusUpdater.getQueueSize()); + } + + @Test(timeout = 20000) + public void testFlushResourceReport() throws Exception { + CountDownLatch barrier = new CountDownLatch(2); + statusUpdater = new MockNodeStatusUpdater(barrier, workerContext); + statusUpdater.init(conf); + statusUpdater.start(); + + assertEquals(0, statusUpdater.getQueueSize()); + dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS)); + + barrier.await(); + assertEquals(0, statusUpdater.getQueueSize()); + } +}
