Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into
hbase_storage
Conflicts:
tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/03c3ea29
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/03c3ea29
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/03c3ea29
Branch: refs/heads/hbase_storage
Commit: 03c3ea2904e1e821b24e8e441b91ea7fed55df35
Parents: 85627a5 55084a8
Author: HyoungJun Kim <[email protected]>
Authored: Wed Nov 12 15:37:57 2014 +0900
Committer: HyoungJun Kim <[email protected]>
Committed: Wed Nov 12 15:37:57 2014 +0900
----------------------------------------------------------------------
BUILDING | 2 +-
CHANGES | 28 +
README | 2 +-
.../org/apache/tajo/catalog/CatalogUtil.java | 12 +-
.../src/main/proto/CatalogProtos.proto | 1 +
.../tajo/catalog/store/HCatalogStore.java | 16 +-
.../apache/tajo/catalog/store/HCatalogUtil.java | 2 +-
.../tajo/catalog/store/TestHCatalogStore.java | 18 +-
tajo-catalog/tajo-catalog-server/pom.xml | 6 +
.../tajo/catalog/store/AbstractDBStore.java | 96 ++-
.../apache/tajo/catalog/store/DerbyStore.java | 330 +--------
.../apache/tajo/catalog/store/OracleStore.java | 337 +--------
.../tajo/catalog/store/PostgreSQLStore.java | 293 +-------
.../catalog/store/XMLCatalogSchemaManager.java | 698 +++++++++++++++++++
.../tajo/catalog/store/object/BaseSchema.java | 76 ++
.../catalog/store/object/DatabaseObject.java | 80 +++
.../store/object/DatabaseObjectType.java | 48 ++
.../tajo/catalog/store/object/SQLObject.java | 52 ++
.../tajo/catalog/store/object/SchemaPatch.java | 78 +++
.../tajo/catalog/store/object/StoreObject.java | 87 +++
.../resources/schemas/DBMSSchemaDefinition.xsd | 177 +++++
.../main/resources/schemas/derby/columns.sql | 8 -
.../main/resources/schemas/derby/databases.sql | 6 -
.../resources/schemas/derby/databases_idx.sql | 1 -
.../src/main/resources/schemas/derby/derby.xml | 186 +++++
.../main/resources/schemas/derby/indexes.sql | 12 -
.../schemas/derby/partition_methods.sql | 6 -
.../main/resources/schemas/derby/partitions.sql | 10 -
.../src/main/resources/schemas/derby/stats.sql | 6 -
.../schemas/derby/table_properties.sql | 6 -
.../src/main/resources/schemas/derby/tables.sql | 10 -
.../resources/schemas/derby/tablespaces.sql | 7 -
.../main/resources/schemas/oracle/oracle.xml | 218 ++++++
.../resources/schemas/postgresql/indexes.sql | 14 -
.../resources/schemas/postgresql/postgresql.xml | 203 ++++++
.../main/resources/schemas/postgresql/stats.sql | 6 -
.../store/TestXMLCatalogSchemaManager.java | 496 +++++++++++++
.../schemas/derbytest/loadtest/derby.xml | 191 +++++
.../derbytest/mergetest/base_version_1.xml | 35 +
.../derbytest/mergetest/base_version_2.xml | 63 ++
.../schemas/derbytest/querytest/derby.xml | 78 +++
.../derbytest/upgradetest/base_version_2.xml | 57 ++
.../cli/tsql/DefaultTajoCliOutputFormatter.java | 1 -
.../cli/tsql/commands/DescTableCommand.java | 1 -
.../org/apache/tajo/client/QueryClient.java | 6 +
.../org/apache/tajo/client/QueryClientImpl.java | 52 ++
.../org/apache/tajo/client/TajoClientImpl.java | 13 +-
tajo-client/src/main/proto/ClientProtos.proto | 60 ++
.../main/proto/QueryMasterClientProtocol.proto | 1 +
.../main/proto/TajoMasterClientProtocol.proto | 1 +
tajo-common/pom.xml | 4 -
.../main/java/org/apache/tajo/SessionVars.java | 7 +-
.../java/org/apache/tajo/conf/TajoConf.java | 28 +-
.../org/apache/tajo/storage/EmptyTuple.java | 176 +++++
.../apache/tajo/storage/StorageConstants.java | 9 +
.../tajo/util/datetime/DateTimeConstants.java | 22 +
.../tajo/util/datetime/DateTimeFormat.java | 2 +-
.../apache/tajo/util/datetime/DateTimeUtil.java | 96 ++-
.../apache/tajo/util/TestDateTimeFormat.java | 12 +-
.../org/apache/tajo/util/TestDateTimeUtil.java | 79 ++-
.../java/org/apache/tajo/benchmark/TPCH.java | 2 +-
.../datetime/DateTimePartFromUnixTimestamp.java | 18 +-
.../apache/tajo/engine/parser/SQLAnalyzer.java | 8 +-
.../engine/planner/PhysicalPlannerImpl.java | 2 +-
.../planner/physical/PhysicalPlanUtil.java | 41 +-
.../org/apache/tajo/master/GlobalEngine.java | 4 +
.../java/org/apache/tajo/master/TajoMaster.java | 21 +
.../tajo/master/TajoMasterClientService.java | 33 +
.../apache/tajo/master/querymaster/Query.java | 41 +-
.../master/querymaster/QueryInProgress.java | 3 +
.../tajo/master/querymaster/QueryInfo.java | 79 ++-
.../tajo/master/querymaster/QueryMaster.java | 89 +--
.../tajo/master/querymaster/QueryUnit.java | 79 +++
.../tajo/master/querymaster/SubQuery.java | 76 ++
.../main/java/org/apache/tajo/util/JSPUtil.java | 162 ++++-
.../org/apache/tajo/util/history/History.java | 27 +
.../tajo/util/history/HistoryCleaner.java | 136 ++++
.../apache/tajo/util/history/HistoryReader.java | 308 ++++++++
.../apache/tajo/util/history/HistoryWriter.java | 450 ++++++++++++
.../apache/tajo/util/history/QueryHistory.java | 151 ++++
.../tajo/util/history/QueryUnitHistory.java | 167 +++++
.../tajo/util/history/SubQueryHistory.java | 270 +++++++
.../tajo/webapp/QueryExecutorServlet.java | 17 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 20 +
.../tajo/worker/TajoWorkerClientService.java | 34 +
.../main/java/org/apache/tajo/worker/Task.java | 5 +-
.../org/apache/tajo/worker/TaskHistory.java | 8 +-
.../src/main/resources/webapps/admin/query.jsp | 72 +-
.../resources/webapps/admin/query_executor.jsp | 16 +-
.../resources/webapps/admin/querydetail.jsp | 116 +++
.../main/resources/webapps/admin/querytasks.jsp | 249 +++++++
.../main/resources/webapps/admin/queryunit.jsp | 134 ++++
.../resources/webapps/worker/querydetail.jsp | 43 +-
.../resources/webapps/worker/querytasks.jsp | 126 ++--
.../resources/webapps/worker/taskhistory.jsp | 123 ++++
.../test/java/org/apache/tajo/TpchTestBase.java | 2 +-
.../apache/tajo/cli/tools/TestDDLBuilder.java | 2 +-
.../org/apache/tajo/cli/tsql/TestTajoCli.java | 36 +
.../org/apache/tajo/client/TestTajoClient.java | 56 +-
.../engine/function/TestDateTimeFunctions.java | 15 +-
.../planner/physical/TestBSTIndexExec.java | 3 -
.../apache/tajo/engine/query/TestCTASQuery.java | 2 +-
.../tajo/engine/query/TestGroupByQuery.java | 17 +-
.../tajo/engine/query/TestJoinBroadcast.java | 4 +-
.../apache/tajo/engine/query/TestJoinQuery.java | 16 +-
.../tajo/engine/query/TestNullValues.java | 14 +-
.../tajo/engine/query/TestSelectQuery.java | 4 +-
.../apache/tajo/engine/query/TestSortQuery.java | 12 +-
.../tajo/engine/query/TestTablePartitions.java | 4 +-
.../org/apache/tajo/jdbc/TestResultSet.java | 2 +-
.../java/org/apache/tajo/util/TestJSPUtil.java | 37 +-
.../util/history/TestHistoryWriterReader.java | 251 +++++++
.../results/TestTajoCli/testDescTable.result | 4 +-
.../TestTajoCli/testNonForwardQueryPause.result | 5 +
.../results/TestTajoDump/testDump1.result | 2 +-
.../testBuildDDLForBaseTable.result | 2 +-
.../testBuildDDLForExternalTable.result | 2 +-
.../testBuildDDLQuotedTableName1.result | 2 +-
.../testBuildDDLQuotedTableName2.result | 2 +-
.../main/sphinx/getting_started/first_query.rst | 2 +-
.../sphinx/getting_started/prerequisites.rst | 6 +-
tajo-docs/src/main/sphinx/sql_language/ddl.rst | 2 +-
.../src/main/sphinx/table_management/csv.rst | 10 +-
tajo-docs/src/main/sphinx/tsql/variables.rst | 2 +-
tajo-project/pom.xml | 6 +-
tajo-rpc/pom.xml | 4 -
tajo-storage/pom.xml | 4 +
.../storage/BinarySerializerDeserializer.java | 1 +
.../org/apache/tajo/storage/BufferPool.java | 74 ++
.../tajo/storage/ByteBufInputChannel.java | 76 ++
.../java/org/apache/tajo/storage/CSVFile.java | 23 +-
.../storage/FieldSerializerDeserializer.java | 35 +
.../java/org/apache/tajo/storage/RawFile.java | 185 ++---
.../tajo/storage/SerializerDeserializer.java | 2 +-
.../storage/TextSerializerDeserializer.java | 1 +
.../tajo/storage/text/ByteBufLineReader.java | 154 ++++
.../tajo/storage/text/DelimitedLineReader.java | 157 +++++
.../tajo/storage/text/DelimitedTextFile.java | 484 +++++++++++++
.../tajo/storage/text/FieldSplitProcessor.java | 38 +
.../tajo/storage/text/LineSplitProcessor.java | 45 ++
.../text/TextFieldSerializerDeserializer.java | 227 ++++++
.../src/main/resources/storage-default.xml | 59 +-
.../tajo/storage/TestCompressionStorages.java | 76 +-
.../org/apache/tajo/storage/TestLineReader.java | 163 +++++
.../apache/tajo/storage/TestSplitProcessor.java | 72 ++
.../org/apache/tajo/storage/TestStorages.java | 18 +-
.../src/test/resources/storage-default.xml | 18 +-
147 files changed, 8574 insertions(+), 1624 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/CHANGES
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --cc
tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 235d827,b3b4dbb..e30b4f9
---
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@@ -41,8 -41,10 +41,9 @@@ import org.apache.tajo.rpc.NettyClientB
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.util.HAServiceUtil;
import org.apache.tajo.util.NetUtils;
+ import org.apache.tajo.util.history.QueryHistory;
import org.apache.tajo.worker.TajoWorker;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --cc
tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index c713660,0f275e9..6bc185e
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@@ -42,9 -43,10 +43,11 @@@ import org.apache.tajo.master.event.Que
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+ import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TajoIdUtils;
+ import org.apache.tajo.util.history.QueryUnitHistory;
import org.apache.tajo.worker.FetchImpl;
import java.net.URI;
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --cc
tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index ec9afaa,96534df..18d4c28
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@@ -63,9 -62,10 +63,11 @@@ import org.apache.tajo.plan.util.Planne
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.KeyValueSet;
+ import org.apache.tajo.util.history.QueryUnitHistory;
+ import org.apache.tajo.util.history.SubQueryHistory;
import org.apache.tajo.worker.FetchImpl;
import java.io.IOException;
@@@ -281,8 -282,10 +283,9 @@@ public class SubQuery implements EventH
private TaskSchedulerContext schedulerContext;
private List<IntermediateEntry> hashShuffleIntermediateEntries = new
ArrayList<IntermediateEntry>();
private AtomicInteger completeReportReceived = new AtomicInteger(0);
+ private SubQueryHistory finalSubQueryHistory;
- public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan
masterPlan,
- ExecutionBlock block, StorageManager sm) {
+ public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan
masterPlan, ExecutionBlock block) {
this.context = context;
this.masterPlan = masterPlan;
this.block = block;
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --cc tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index f72a5a1,2fae243..de3912a
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@@ -32,7 -32,7 +32,8 @@@ import org.apache.tajo.common.TajoDataT
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatumFactory;
+ import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.BitArray;
@@@ -57,13 -57,16 +58,16 @@@ public class RawFile
private int headerSize = 0; // Header size of a tuple
private BitArray nullFlags;
private static final int RECORD_SIZE = 4;
- private boolean eof = false;
- private long fileLimit; // If this.fragment represents a complete file,
this value is equal to the file's size
- private long numBytesRead;
+ private boolean eos = false;
+ private long startOffset;
+ private long endOffset;
private FileInputStream fis;
private long recordCount;
+ private long totalReadBytes;
+ private long filePosition;
+ private boolean forceFillBuffer;
- public RawFileScanner(Configuration conf, Schema schema, TableMeta meta,
FileFragment fragment) throws IOException {
+ public RawFileScanner(Configuration conf, Schema schema, TableMeta meta,
Fragment fragment) throws IOException {
super(conf, schema, meta, fragment);
}
@@@ -81,22 -84,16 +85,16 @@@
fis = new FileInputStream(file);
channel = fis.getChannel();
- fileLimit = fragment.getStartKey() + fragment.getLength(); // fileLimit
is less than or equal to fileSize
+ filePosition = startOffset = fragment.getStartKey();
- endOffset = fragment.getStartKey() + fragment.getEndKey();
++ endOffset = fragment.getStartKey() + fragment.getLength();
- if (tableStats != null) {
- tableStats.setNumBytes(fragment.getLength());
- }
if (LOG.isDebugEnabled()) {
- LOG.debug("RawFileScanner open:" + fragment + "," +
channel.position() + ", total file size :" + channel.size()
- + ", fragment size :" + fragment.getLength() + ", fileLimit: " +
fileLimit);
+ LOG.debug("RawFileScanner open:" + fragment + "," +
channel.position() + ", file size :" + channel.size()
- + ", fragment length :" + fragment.getEndKey());
++ + ", fragment length :" + fragment.getLength());
}
- if (fragment.getLength() < 64 * StorageUnit.KB) {
- bufferSize = (int)fragment.getLength();
- } else {
- bufferSize = 64 * StorageUnit.KB;
- }
- buffer = ByteBuffer.allocateDirect(bufferSize);
+ buf = BufferPool.directBuffer(64 * StorageUnit.KB);
+ buffer = buf.nioBuffer(0, buf.capacity());
columnTypes = new DataType[schema.size()];
for (int i = 0; i < schema.size(); i++) {
@@@ -124,16 -120,22 +121,22 @@@
@Override
public void seek(long offset) throws IOException {
- long currentPos = channel.position();
- if(currentPos < offset && offset < currentPos + buffer.limit()){
- buffer.position((int)(offset - currentPos));
+ eos = false;
+ filePosition = channel.position();
+
+ // do not fill the buffer if the offset is already included in the
buffer.
+ if(!forceFillBuffer && filePosition > offset && offset > filePosition -
buffer.limit()){
+ buffer.position((int)(offset - (filePosition - buffer.limit())));
} else {
- buffer.clear();
- if(offset < startOffset || offset > startOffset +
fragment.getEndKey()){
++ if(offset < startOffset || offset > startOffset +
fragment.getLength()){
+ throw new IndexOutOfBoundsException(String.format("range(%d, %d),
offset: %d",
- startOffset, startOffset + fragment.getEndKey(), offset));
++ startOffset, startOffset + fragment.getLength(), offset));
+ }
channel.position(offset);
- int bytesRead = channel.read(buffer);
- numBytesRead = bytesRead;
- buffer.flip();
- eof = false;
+ filePosition = offset;
+ buffer.clear();
+ forceFillBuffer = true;
+ fillBuffer();
}
}
@@@ -421,28 -428,25 +429,25 @@@
}
@Override
- public float getProgress() {
- try {
+ public TableStats getInputStats() {
+ if(tableStats != null){
tableStats.setNumRows(recordCount);
- long filePos = 0;
- if (channel != null) {
- filePos = channel.position();
- tableStats.setReadBytes(filePos);
- }
+ tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan +
rescan * n)
- tableStats.setNumBytes(fragment.getEndKey());
++ tableStats.setNumBytes(fragment.getLength());
+ }
+ return tableStats;
+ }
- if(eof || channel == null) {
- tableStats.setReadBytes(fragment.getLength());
- return 1.0f;
- }
+ @Override
+ public float getProgress() {
+ if(eos) {
+ return 1.0f;
+ }
- if (filePos == 0) {
- return 0.0f;
- } else {
- return Math.min(1.0f, ((float)filePos /
(float)fragment.getLength()));
- }
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
+ if (filePosition - startOffset == 0) {
return 0.0f;
+ } else {
+ return Math.min(1.0f, ((float) filePosition / endOffset));
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --cc
tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
index 0000000,eb1929e..d9e2016
mode 000000,100644..100644
---
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@@ -1,0 -1,157 +1,157 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.storage.text;
+
+ import io.netty.buffer.ByteBuf;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataInputStream;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.io.compress.CompressionCodec;
+ import org.apache.hadoop.io.compress.CompressionCodecFactory;
+ import org.apache.hadoop.io.compress.Decompressor;
+ import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+ import org.apache.tajo.common.exception.NotImplementedException;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.storage.ByteBufInputChannel;
+ import org.apache.tajo.storage.FileScanner;
+ import org.apache.tajo.storage.BufferPool;
+ import org.apache.tajo.storage.compress.CodecPool;
+ import org.apache.tajo.storage.fragment.FileFragment;
+
+ import java.io.Closeable;
+ import java.io.DataInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.concurrent.atomic.AtomicInteger;
+
+ public class DelimitedLineReader implements Closeable {
+ private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class);
+ private final static int DEFAULT_PAGE_SIZE = 128 * 1024;
+
+ private FileSystem fs;
+ private FSDataInputStream fis;
+ private InputStream is; //decompressd stream
+ private CompressionCodecFactory factory;
+ private CompressionCodec codec;
+ private Decompressor decompressor;
+
+ private long startOffset, end, pos;
+ private boolean eof = true;
+ private ByteBufLineReader lineReader;
+ private AtomicInteger tempReadBytes = new AtomicInteger();
+ private FileFragment fragment;
+ private Configuration conf;
+
+ public DelimitedLineReader(Configuration conf, final FileFragment fragment)
throws IOException {
+ this.fragment = fragment;
+ this.conf = conf;
+ this.factory = new CompressionCodecFactory(conf);
+ this.codec = factory.getCodec(fragment.getPath());
+ if (this.codec instanceof SplittableCompressionCodec) {
+ throw new NotImplementedException(); // bzip2 does not support
multi-thread model
+ }
+ }
+
+ public void init() throws IOException {
+ if (fs == null) {
+ fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath());
+ }
+ if (fis == null) fis = fs.open(fragment.getPath());
+ pos = startOffset = fragment.getStartKey();
- end = startOffset + fragment.getEndKey();
++ end = startOffset + fragment.getLength();
+
+ if (codec != null) {
+ decompressor = CodecPool.getDecompressor(codec);
+ is = new DataInputStream(codec.createInputStream(fis, decompressor));
+ ByteBufInputChannel channel = new ByteBufInputChannel(is);
+ lineReader = new ByteBufLineReader(channel,
BufferPool.directBuffer(DEFAULT_PAGE_SIZE));
+ } else {
+ fis.seek(startOffset);
+ is = fis;
+
+ ByteBufInputChannel channel = new ByteBufInputChannel(is);
+ lineReader = new ByteBufLineReader(channel,
+ BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end)));
+ }
+ eof = false;
+ }
+
+ public long getCompressedPosition() throws IOException {
+ long retVal;
+ if (isCompressed()) {
+ retVal = fis.getPos();
+ } else {
+ retVal = pos;
+ }
+ return retVal;
+ }
+
+ public long getUnCompressedPosition() throws IOException {
+ return pos;
+ }
+
+ public long getReadBytes() {
+ return pos - startOffset;
+ }
+
+ public boolean isReadable() {
+ return !eof;
+ }
+
+ public ByteBuf readLine() throws IOException {
+ if (eof) {
+ return null;
+ }
+
+ ByteBuf buf = lineReader.readLineBuf(tempReadBytes);
+ if (buf == null) {
+ eof = true;
+ } else {
+ pos += tempReadBytes.get();
+ }
+
+ if (!isCompressed() && getCompressedPosition() > end) {
+ eof = true;
+ }
+ return buf;
+ }
+
+ public boolean isCompressed() {
+ return codec != null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ IOUtils.cleanup(LOG, lineReader, is, fis);
+ fs = null;
+ is = null;
+ fis = null;
+ lineReader = null;
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --cc
tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 0000000,dbf8435..aad97bc
mode 000000,100644..100644
---
a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++
b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@@ -1,0 -1,483 +1,484 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.storage.text;
+
+ import io.netty.buffer.ByteBuf;
+ import org.apache.commons.lang.StringEscapeUtils;
+ import org.apache.commons.lang.StringUtils;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataOutputStream;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.io.compress.CompressionCodec;
+ import org.apache.hadoop.io.compress.CompressionCodecFactory;
+ import org.apache.hadoop.io.compress.CompressionOutputStream;
+ import org.apache.hadoop.io.compress.Compressor;
+ import org.apache.hadoop.util.ReflectionUtils;
+ import org.apache.tajo.catalog.Schema;
+ import org.apache.tajo.catalog.TableMeta;
+ import org.apache.tajo.catalog.statistics.TableStats;
+ import org.apache.tajo.datum.Datum;
+ import org.apache.tajo.datum.NullDatum;
+ import org.apache.tajo.storage.*;
+ import org.apache.tajo.storage.compress.CodecPool;
+ import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
+ import org.apache.tajo.storage.fragment.FileFragment;
++import org.apache.tajo.storage.fragment.Fragment;
+ import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+ import java.io.BufferedOutputStream;
+ import java.io.DataOutputStream;
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.util.Arrays;
+
+ public class DelimitedTextFile {
+
+ public static final byte LF = '\n';
+ public static int EOF = -1;
+
+ private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class);
+
+ public static class DelimitedTextFileAppender extends FileAppender {
+ private final TableMeta meta;
+ private final Schema schema;
+ private final int columnNum;
+ private final FileSystem fs;
+ private FSDataOutputStream fos;
+ private DataOutputStream outputStream;
+ private CompressionOutputStream deflateFilter;
+ private char delimiter;
+ private TableStatistics stats = null;
+ private Compressor compressor;
+ private CompressionCodecFactory codecFactory;
+ private CompressionCodec codec;
+ private Path compressedPath;
+ private byte[] nullChars;
+ private int BUFFER_SIZE = 128 * 1024;
+ private int bufferedBytes = 0;
+ private long pos = 0;
+
+ private NonSyncByteArrayOutputStream os;
+ private FieldSerializerDeserializer serde;
+
+ public DelimitedTextFileAppender(Configuration conf, final Schema schema,
final TableMeta meta, final Path path)
+ throws IOException {
+ super(conf, schema, meta, path);
+ this.fs = path.getFileSystem(conf);
+ this.meta = meta;
+ this.schema = schema;
+ this.delimiter =
StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER,
+ StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
+ this.columnNum = schema.size();
+
+ String nullCharacters =
StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL,
+ NullDatum.DEFAULT_TEXT));
+ if (StringUtils.isEmpty(nullCharacters)) {
+ nullChars = NullDatum.get().asTextBytes();
+ } else {
+ nullChars = nullCharacters.getBytes();
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ if (!fs.exists(path.getParent())) {
+ throw new FileNotFoundException(path.toString());
+ }
+
+ if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
+ String codecName =
this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
+ codecFactory = new CompressionCodecFactory(conf);
+ codec = codecFactory.getCodecByClassName(codecName);
+ compressor = CodecPool.getCompressor(codec);
+ if (compressor != null) compressor.reset(); //builtin gzip is null
+
+ String extension = codec.getDefaultExtension();
+ compressedPath = path.suffix(extension);
+
+ if (fs.exists(compressedPath)) {
+ throw new AlreadyExistsStorageException(compressedPath);
+ }
+
+ fos = fs.create(compressedPath);
+ deflateFilter = codec.createOutputStream(fos, compressor);
+ outputStream = new DataOutputStream(deflateFilter);
+
+ } else {
+ if (fs.exists(path)) {
+ throw new AlreadyExistsStorageException(path);
+ }
+ fos = fs.create(path);
+ outputStream = new DataOutputStream(new BufferedOutputStream(fos));
+ }
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+
+ try {
+ // we need to discuss the De/Serializer interface. so custom serde is
to disable
+ String serdeClass =
this.meta.getOption(StorageConstants.TEXTFILE_SERDE,
+ TextFieldSerializerDeserializer.class.getName());
+ serde = (TextFieldSerializerDeserializer)
ReflectionUtils.newInstance(Class.forName(serdeClass), conf);
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+
+ if (os == null) {
+ os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+ }
+
+ os.reset();
+ pos = fos.getPos();
+ bufferedBytes = 0;
+ super.init();
+ }
+
+
+ @Override
+ public void addTuple(Tuple tuple) throws IOException {
+ Datum datum;
+ int rowBytes = 0;
+
+ for (int i = 0; i < columnNum; i++) {
+ datum = tuple.get(i);
+ rowBytes += serde.serialize(os, datum, schema.getColumn(i), i,
nullChars);
+
+ if (columnNum - 1 > i) {
+ os.write((byte) delimiter);
+ rowBytes += 1;
+ }
+ }
+ os.write(LF);
+ rowBytes += 1;
+
+ pos += rowBytes;
+ bufferedBytes += rowBytes;
+ if (bufferedBytes > BUFFER_SIZE) {
+ flushBuffer();
+ }
+ // Statistical section
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ private void flushBuffer() throws IOException {
+ if (os.getLength() > 0) {
+ os.writeTo(outputStream);
+ os.reset();
+ bufferedBytes = 0;
+ }
+ }
+
+ @Override
+ public long getOffset() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushBuffer();
+ outputStream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ try {
+ if(outputStream != null){
+ flush();
+ }
+
+ // Statistical section
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
+
+ if (deflateFilter != null) {
+ deflateFilter.finish();
+ deflateFilter.resetState();
+ deflateFilter = null;
+ }
+
+ os.close();
+ } finally {
+ IOUtils.cleanup(LOG, fos);
+ if (compressor != null) {
+ CodecPool.returnCompressor(compressor);
+ compressor = null;
+ }
+ }
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+
+ public boolean isCompress() {
+ return compressor != null;
+ }
+
+ public String getExtension() {
+ return codec != null ? codec.getDefaultExtension() : "";
+ }
+ }
+
+ public static class DelimitedTextFileScanner extends FileScanner {
+
+ private boolean splittable = false;
+ private final long startOffset;
+ private final long endOffset;
+
+ private int recordCount = 0;
+ private int[] targetColumnIndexes;
+
+ private ByteBuf nullChars;
+ private FieldSerializerDeserializer serde;
+ private DelimitedLineReader reader;
+ private FieldSplitProcessor processor;
+
+ public DelimitedTextFileScanner(Configuration conf, final Schema schema,
final TableMeta meta,
- final FileFragment fragment)
++ final Fragment fragment)
+ throws IOException {
+ super(conf, schema, meta, fragment);
- reader = new DelimitedLineReader(conf, fragment);
++ reader = new DelimitedLineReader(conf, this.fragment);
+ if (!reader.isCompressed()) {
+ splittable = true;
+ }
+
- startOffset = fragment.getStartKey();
- endOffset = startOffset + fragment.getEndKey();
++ startOffset = this.fragment.getStartKey();
++ endOffset = startOffset + fragment.getLength();
+
+ //Delimiter
+ String delim = meta.getOption(StorageConstants.TEXT_DELIMITER,
StorageConstants.DEFAULT_FIELD_DELIMITER);
+ this.processor = new
FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0));
+ }
+
+ @Override
+ public void init() throws IOException {
+ if (nullChars != null) {
+ nullChars.release();
+ }
+
+ String nullCharacters =
StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
+ NullDatum.DEFAULT_TEXT));
+ byte[] bytes;
+ if (StringUtils.isEmpty(nullCharacters)) {
+ bytes = NullDatum.get().asTextBytes();
+ } else {
+ bytes = nullCharacters.getBytes();
+ }
+
+ nullChars = BufferPool.directBuffer(bytes.length, bytes.length);
+ nullChars.writeBytes(bytes);
+
+ if (reader != null) {
+ reader.close();
+ }
+ reader = new DelimitedLineReader(conf, fragment);
+ reader.init();
+ recordCount = 0;
+
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ targetColumnIndexes = new int[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ targetColumnIndexes[i] =
schema.getColumnId(targets[i].getQualifiedName());
+ }
+
+ try {
+ // we need to discuss the De/Serializer interface. so custom serde is
to disable
+ String serdeClass =
this.meta.getOption(StorageConstants.TEXTFILE_SERDE,
+ TextFieldSerializerDeserializer.class.getName());
+ serde = (TextFieldSerializerDeserializer)
ReflectionUtils.newInstance(Class.forName(serdeClass), conf);
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+
+ super.init();
+ Arrays.sort(targetColumnIndexes);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + ","
+ startOffset + "," + endOffset);
+ }
+
+ if (startOffset > 0) {
+ reader.readLine(); // skip first line;
+ }
+ }
+
+ public ByteBuf readLine() throws IOException {
+ ByteBuf buf = reader.readLine();
+ if (buf == null) {
+ return null;
+ } else {
+ recordCount++;
+ }
+
+ return buf;
+ }
+
+ @Override
+ public float getProgress() {
+ try {
+ if (!reader.isReadable()) {
+ return 1.0f;
+ }
+ long filePos = reader.getCompressedPosition();
+ if (startOffset == filePos) {
+ return 0.0f;
+ } else {
+ long readBytes = filePos - startOffset;
+ long remainingBytes = Math.max(endOffset - filePos, 0);
+ return Math.min(1.0f, (float) (readBytes) / (float) (readBytes +
remainingBytes));
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ return 0.0f;
+ }
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ try {
+ if (!reader.isReadable()) return null;
+
+ ByteBuf buf = readLine();
+ if (buf == null) return null;
+
+ if (targets.length == 0) {
+ return EmptyTuple.get();
+ }
+
+ VTuple tuple = new VTuple(schema.size());
+ fillTuple(schema, tuple, buf, targetColumnIndexes);
+ return tuple;
+ } catch (Throwable t) {
+ LOG.error("Tuple list current index: " + recordCount + " file
offset:" + reader.getCompressedPosition(), t);
+ throw new IOException(t);
+ }
+ }
+
+ private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[]
target) throws IOException {
+ int[] projection = target;
+ if (lineBuf == null || target == null || target.length == 0) {
+ return;
+ }
+
+ final int rowLength = lineBuf.readableBytes();
+ int start = 0, fieldLength = 0, end = 0;
+
+ //Projection
+ int currentTarget = 0;
+ int currentIndex = 0;
+
+ while (end != -1) {
+ end = lineBuf.forEachByte(start, rowLength - start, processor);
+
+ if (end < 0) {
+ fieldLength = rowLength - start;
+ } else {
+ fieldLength = end - start;
+ }
+
+ if (projection.length > currentTarget && currentIndex ==
projection[currentTarget]) {
+ Datum datum = serde.deserialize(lineBuf.slice(start, fieldLength),
+ schema.getColumn(currentIndex), currentIndex, nullChars);
+ dst.put(currentIndex, datum);
+ currentTarget++;
+ }
+
+ if (projection.length == currentTarget) {
+ break;
+ }
+
+ start = end + 1;
+ currentIndex++;
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ init();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (nullChars != null) {
+ nullChars.release();
+ nullChars = null;
+ }
+
+ if (tableStats != null && reader != null) {
+ tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed
Bytes. (decompressed bytes + overhead)
+ tableStats.setNumRows(recordCount);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DelimitedTextFileScanner processed record:" +
recordCount);
+ }
+ } finally {
+ IOUtils.cleanup(LOG, reader);
+ reader = null;
+ }
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return splittable;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (tableStats != null && reader != null) {
+ tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed
Bytes. (decompressed bytes + overhead)
+ tableStats.setNumRows(recordCount);
- tableStats.setNumBytes(fragment.getEndKey());
++ tableStats.setNumBytes(fragment.getLength());
+ }
+ return tableStats;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --cc tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 0000000,ef6efdf..1a4bdba
mode 000000,100644..100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@@ -1,0 -1,163 +1,163 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.storage;
+
+ import io.netty.buffer.ByteBuf;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.io.compress.DeflateCodec;
+ import org.apache.tajo.catalog.CatalogUtil;
+ import org.apache.tajo.catalog.Schema;
+ import org.apache.tajo.catalog.TableMeta;
+ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+ import org.apache.tajo.common.TajoDataTypes.Type;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.datum.DatumFactory;
+ import org.apache.tajo.datum.NullDatum;
+ import org.apache.tajo.storage.fragment.FileFragment;
+ import org.apache.tajo.storage.text.ByteBufLineReader;
+ import org.apache.tajo.storage.text.DelimitedTextFile;
+ import org.apache.tajo.storage.text.DelimitedLineReader;
+ import org.apache.tajo.util.CommonTestingUtil;
+ import org.junit.Test;
+
+ import java.io.IOException;
+ import java.util.concurrent.atomic.AtomicInteger;
+
+ import static org.junit.Assert.*;
+
+ public class TestLineReader {
+ private static String TEST_PATH = "target/test-data/TestLineReader";
+
+ @Test
+ public void testByteBufLineReader() throws IOException {
+ TajoConf conf = new TajoConf();
+ Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ FileSystem fs = testDir.getFileSystem(conf);
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("comment", Type.TEXT);
+ schema.addColumn("comment2", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+ Path tablePath = new Path(testDir, "line.data");
- FileAppender appender = (FileAppender)
StorageManager.getStorageManager(conf).getAppender(meta, schema,
++ FileAppender appender = (FileAppender)
StorageManager.getFileStorageManager(conf).getAppender(meta, schema,
+ tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(4);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+ vTuple.put(3, NullDatum.get());
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+
+ ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
+ assertEquals(status.getLen(), channel.available());
+ ByteBufLineReader reader = new ByteBufLineReader(channel);
+ assertEquals(status.getLen(), reader.available());
+
+ long totalRead = 0;
+ int i = 0;
+ AtomicInteger bytes = new AtomicInteger();
+ for(;;){
+ ByteBuf buf = reader.readLineBuf(bytes);
+ if(buf == null) break;
+
+ totalRead += bytes.get();
+ i++;
+ }
+ IOUtils.cleanup(null, reader, channel, fs);
+ assertEquals(tupleNum, i);
+ assertEquals(status.getLen(), totalRead);
+ assertEquals(status.getLen(), reader.readBytes());
+ }
+
+ @Test
+ public void testLineDelimitedReader() throws IOException {
+ TajoConf conf = new TajoConf();
+ Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ FileSystem fs = testDir.getFileSystem(conf);
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("comment", Type.TEXT);
+ schema.addColumn("comment2", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
+ meta.putOption("compression.codec",
DeflateCodec.class.getCanonicalName());
+
+ Path tablePath = new Path(testDir, "line1." +
DeflateCodec.class.getSimpleName());
- FileAppender appender = (FileAppender)
StorageManager.getStorageManager(conf).getAppender(meta, schema,
++ FileAppender appender = (FileAppender)
StorageManager.getFileStorageManager(conf).getAppender(meta, schema,
+ tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ long splitOffset = 0;
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(4);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
+ vTuple.put(3, NullDatum.get());
+ appender.addTuple(vTuple);
+
+ if(i == (tupleNum / 2)){
+ splitOffset = appender.getOffset();
+ }
+ }
+ String extension = ((DelimitedTextFile.DelimitedTextFileAppender)
appender).getExtension();
+ appender.close();
+
+ tablePath = tablePath.suffix(extension);
+ FileFragment fragment = new FileFragment("table", tablePath, 0,
splitOffset);
+ DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); //
if file is compressed, will read to EOF
+ assertTrue(reader.isCompressed());
+ assertFalse(reader.isReadable());
+ reader.init();
+ assertTrue(reader.isReadable());
+
+
+ int i = 0;
+ while(reader.isReadable()){
+ ByteBuf buf = reader.readLine();
+ if(buf == null) break;
+ i++;
+ }
+
+ IOUtils.cleanup(null, reader, fs);
+ assertEquals(tupleNum, i);
+
+ }
+ }
http://git-wip-us.apache.org/repos/asf/tajo/blob/03c3ea29/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --cc tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 8c6c7ad,a3f80cf..b1e3a28
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@@ -789,8 -791,8 +791,8 @@@ public class TestStorages
TableMeta meta = CatalogUtil.newTableMeta(storeType);
Path tablePath = new Path(testDir, "Seekable.data");
- FileAppender appender = (FileAppender)
StorageManager.getStorageManager(conf).getAppender(meta, schema,
+ FileAppender appender = (FileAppender)
StorageManager.getFileStorageManager(conf).getAppender(meta, schema,
- tablePath);
+ tablePath);
appender.enableStats();
appender.init();
int tupleNum = 100000;