http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
new file mode 100644
index 0000000..15ead84
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.service.Service;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class TestHistory {
+  private TajoTestingCluster cluster;
+  private TajoMaster master;
+  private TajoConf conf;
+  private TajoClient client;
+
+  @Before
+  public void setUp() throws Exception {
+    cluster = TpchTestBase.getInstance().getTestingCluster();
+    master = cluster.getMaster();
+    conf = cluster.getConfiguration();
+    client = new TajoClient(conf);
+  }
+
+  @After
+  public void tearDown() {
+    client.close();
+  }
+
+
+  @Test
+  public final void testTaskRunnerHistory() throws IOException, 
ServiceException, InterruptedException {
+    int beforeFinishedQueriesCount = 
master.getContext().getQueryJobManager().getFinishedQueries().size();
+    client.executeQueryAndGetResult("select sleep(1) from lineitem");
+
+    Collection<QueryInProgress> finishedQueries = 
master.getContext().getQueryJobManager().getFinishedQueries();
+    assertTrue(finishedQueries.size() > beforeFinishedQueriesCount);
+
+    TajoWorker worker = cluster.getTajoWorkers().get(0);
+    TaskRunnerManager taskRunnerManager = 
worker.getWorkerContext().getTaskRunnerManager();
+    assertNotNull(taskRunnerManager);
+
+
+    Collection<TaskRunnerHistory> histories = 
taskRunnerManager.getExecutionBlockHistories();
+    assertTrue(histories.size() > 0);
+
+    TaskRunnerHistory history = histories.iterator().next();
+    assertEquals(Service.STATE.STOPPED, history.getState());
+
+    assertEquals(history, new TaskRunnerHistory(history.getProto()));
+  }
+
+  @Test
+  public final void testTaskHistory() throws IOException, ServiceException, 
InterruptedException {
+    int beforeFinishedQueriesCount = 
master.getContext().getQueryJobManager().getFinishedQueries().size();
+    client.executeQueryAndGetResult("select sleep(1) from lineitem");
+
+    Collection<QueryInProgress> finishedQueries = 
master.getContext().getQueryJobManager().getFinishedQueries();
+    assertTrue(finishedQueries.size() > beforeFinishedQueriesCount);
+
+    TajoWorker worker = cluster.getTajoWorkers().get(0);
+    TaskRunnerManager taskRunnerManager = 
worker.getWorkerContext().getTaskRunnerManager();
+    assertNotNull(taskRunnerManager);
+
+
+    Collection<TaskRunnerHistory> histories = 
taskRunnerManager.getExecutionBlockHistories();
+    assertTrue(histories.size() > 0);
+
+    TaskRunnerHistory history = histories.iterator().next();
+
+    assertTrue(history.size() > 0);
+    assertEquals(Service.STATE.STOPPED, history.getState());
+
+    Map.Entry<QueryUnitAttemptId, TaskHistory> entry =
+        history.getTaskHistoryMap().entrySet().iterator().next();
+
+    QueryUnitAttemptId queryUnitAttemptId = entry.getKey();
+    TaskHistory taskHistory = entry.getValue();
+
+    assertEquals(TajoProtos.TaskAttemptState.TA_SUCCEEDED, 
taskHistory.getState());
+    assertEquals(queryUnitAttemptId, taskHistory.getQueryUnitAttemptId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 652a8e9..17b9229 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.catalog.CatalogConstants;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -287,14 +286,18 @@ public class CSVFile {
     private boolean eof = false;
     private final byte[] nullChars;
     private SplitLineReader reader;
-    private ArrayList<Long> fileOffsets = new ArrayList<Long>();
-    private ArrayList<Integer> rowLengthList = new ArrayList<Integer>();
-    private ArrayList<Integer> startOffsets = new ArrayList<Integer>();
-    private NonSyncByteArrayOutputStream buffer = new 
NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
+    private ArrayList<Long> fileOffsets;
+    private ArrayList<Integer> rowLengthList;
+    private ArrayList<Integer> startOffsets;
+    private NonSyncByteArrayOutputStream buffer;
     private SerializerDeserializer serde;
 
     @Override
     public void init() throws IOException {
+      fileOffsets = new ArrayList<Long>();
+      rowLengthList = new ArrayList<Integer>();
+      startOffsets = new ArrayList<Integer>();
+      buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
 
       // FileFragment information
       if(fs == null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e650223/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java 
b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index 0235ce9..8917f21 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -135,6 +135,7 @@ public class MergeScanner implements Scanner {
   public void close() throws IOException {
     if(currentScanner != null) {
       currentScanner.close();
+      currentScanner = null;
     }
     iterator = null;
     progress = 1.0f;

Reply via email to