http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java new file mode 100644 index 0000000..15ead84 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.service.Service; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.TpchTestBase; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.querymaster.QueryInProgress; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import static org.junit.Assert.*; + +public class TestHistory { + private TajoTestingCluster cluster; + private TajoMaster master; + private TajoConf conf; + private TajoClient client; + + @Before + public void setUp() throws Exception { + cluster = TpchTestBase.getInstance().getTestingCluster(); + master = cluster.getMaster(); + conf = cluster.getConfiguration(); + client = new TajoClient(conf); + } + + @After + public void tearDown() { + client.close(); + } + + + @Test + public final void testTaskRunnerHistory() throws IOException, ServiceException, InterruptedException { + int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size(); + client.executeQueryAndGetResult("select sleep(1) from lineitem"); + + Collection<QueryInProgress> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries(); + assertTrue(finishedQueries.size() > beforeFinishedQueriesCount); + + TajoWorker worker = cluster.getTajoWorkers().get(0); + TaskRunnerManager taskRunnerManager = worker.getWorkerContext().getTaskRunnerManager(); + assertNotNull(taskRunnerManager); + + + Collection<TaskRunnerHistory> histories = taskRunnerManager.getExecutionBlockHistories(); + assertTrue(histories.size() > 0); + + TaskRunnerHistory history = histories.iterator().next(); + assertEquals(Service.STATE.STOPPED, history.getState()); + + assertEquals(history, new TaskRunnerHistory(history.getProto())); + } + + @Test + public final void testTaskHistory() throws IOException, ServiceException, InterruptedException { + int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size(); + client.executeQueryAndGetResult("select sleep(1) from lineitem"); + + Collection<QueryInProgress> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries(); + assertTrue(finishedQueries.size() > beforeFinishedQueriesCount); + + TajoWorker worker = cluster.getTajoWorkers().get(0); + TaskRunnerManager taskRunnerManager = worker.getWorkerContext().getTaskRunnerManager(); + assertNotNull(taskRunnerManager); + + + Collection<TaskRunnerHistory> histories = taskRunnerManager.getExecutionBlockHistories(); + assertTrue(histories.size() > 0); + + TaskRunnerHistory history = histories.iterator().next(); + + assertTrue(history.size() > 0); + assertEquals(Service.STATE.STOPPED, history.getState()); + + Map.Entry<QueryUnitAttemptId, TaskHistory> entry = + history.getTaskHistoryMap().entrySet().iterator().next(); + + QueryUnitAttemptId queryUnitAttemptId = entry.getKey(); + TaskHistory taskHistory = entry.getValue(); + + assertEquals(TajoProtos.TaskAttemptState.TA_SUCCEEDED, taskHistory.getState()); + assertEquals(queryUnitAttemptId, taskHistory.getQueryUnitAttemptId()); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java index 652a8e9..17b9229 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.*; -import org.apache.tajo.catalog.CatalogConstants; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -287,14 +286,18 @@ public class CSVFile { private boolean eof = false; private final byte[] nullChars; private SplitLineReader reader; - private ArrayList<Long> fileOffsets = new ArrayList<Long>(); - private ArrayList<Integer> rowLengthList = new ArrayList<Integer>(); - private ArrayList<Integer> startOffsets = new ArrayList<Integer>(); - private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE); + private ArrayList<Long> fileOffsets; + private ArrayList<Integer> rowLengthList; + private ArrayList<Integer> startOffsets; + private NonSyncByteArrayOutputStream buffer; private SerializerDeserializer serde; @Override public void init() throws IOException { + fileOffsets = new ArrayList<Long>(); + rowLengthList = new ArrayList<Integer>(); + startOffsets = new ArrayList<Integer>(); + buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE); // FileFragment information if(fs == null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java index 0235ce9..8917f21 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -135,6 +135,7 @@ public class MergeScanner implements Scanner { public void close() throws IOException { if(currentScanner != null) { currentScanner.close(); + currentScanner = null; } iterator = null; progress = 1.0f;
