TAJO-1340: Change the default output file format. Closes #671
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8bbd51df Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8bbd51df Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8bbd51df Branch: refs/heads/master Commit: 8bbd51df292e4d74c95a54cb610754e1d6d77756 Parents: fb5da66 Author: Jinho Kim <[email protected]> Authored: Mon Sep 14 16:30:03 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Mon Sep 14 16:30:03 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/client/QueryClient.java | 3 +- .../org/apache/tajo/client/QueryClientImpl.java | 75 +++++--- .../apache/tajo/client/SessionConnection.java | 1 + .../org/apache/tajo/client/TajoClientImpl.java | 13 +- .../org/apache/tajo/client/TajoClientUtil.java | 10 +- .../org/apache/tajo/jdbc/FetchResultSet.java | 80 ++++++--- .../apache/tajo/jdbc/TajoMemoryResultSet.java | 70 ++++++-- .../org/apache/tajo/jdbc/TajoResultSetBase.java | 26 +-- tajo-client/src/main/proto/ClientProtos.proto | 10 +- .../org/apache/tajo/TajoTestingCluster.java | 3 + tajo-common/pom.xml | 4 + .../main/java/org/apache/tajo/SessionVars.java | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 7 +- .../org/apache/tajo/tuple/BaseTupleBuilder.java | 11 ++ .../tuple/memory/OffHeapRowBlockWriter.java | 14 ++ .../tajo/tuple/memory/OffHeapRowWriter.java | 19 +- .../org/apache/tajo/util/CompressionUtil.java | 73 ++++++++ tajo-common/src/main/proto/DataTypes.proto | 1 + tajo-common/src/main/proto/tajo_protos.proto | 4 + .../tajo/client/TestQueryClientExceptions.java | 13 +- .../tajo/querymaster/TestTaskStatusUpdate.java | 6 +- .../testDistinctAggregation_case10.result | 2 +- .../TestTajoCli/testHelpSessionVars.result | 1 + .../testSelectResultWithNullFalse.result | 2 +- .../testSelectResultWithNullTrue.result | 2 +- ...estSelectResultWithNullTrueDeprecated.result | 2 +- .../TestTajoCli/testStopWhenError.result | 2 +- .../testStopWhenErrorDeprecated.result | 2 +- .../TestWindowQuery/testStdDevPop1.result | 4 +- .../TestWindowQuery/testStdDevSamp1.result | 8 +- .../tajo/engine/planner/global/DataChannel.java | 3 +- .../engine/planner/global/GlobalPlanner.java | 14 +- .../planner/physical/PhysicalPlanUtil.java | 17 +- .../engine/planner/physical/SeqScanExec.java | 38 +++- .../tajo/master/TajoMasterClientService.java | 27 ++- .../exec/NonForwardQueryResultFileScanner.java | 177 +++++++++++++++++-- .../exec/NonForwardQueryResultScanner.java | 20 ++- .../NonForwardQueryResultSystemScanner.java | 75 +++++++- .../apache/tajo/master/exec/QueryExecutor.java | 53 ++++-- .../java/org/apache/tajo/querymaster/Stage.java | 4 +- .../org/apache/tajo/jdbc/JdbcConnection.java | 18 +- .../org/apache/tajo/jdbc/TestResultSet.java | 75 ++++++-- .../java/org/apache/tajo/jdbc/TestTajoJdbc.java | 88 ++++++++- .../tajo/plan/logical/PersistentStoreNode.java | 3 +- tajo-project/pom.xml | 5 + .../org/apache/tajo/rpc/RpcClientManager.java | 12 +- 47 files changed, 869 insertions(+), 232 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 28dcfa5..468a7a3 100644 --- a/CHANGES +++ b/CHANGES @@ -36,6 +36,8 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1340: Change the default output file format. (jinho) + TAJO-1835: TajoClient::executeQueryAndGetResult should throw Query(Failed|Killed)Exception. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java index ad9bfc5..f436afb 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java @@ -35,6 +35,7 @@ import java.io.Closeable; import java.sql.ResultSet; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; public interface QueryClient extends Closeable { @@ -106,7 +107,7 @@ public interface QueryClient extends Closeable { GetQueryResultResponse getResultResponse(QueryId queryId) throws TajoException; - TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws TajoException; + Future<TajoMemoryResultSet> fetchNextQueryResultAsync(final QueryId queryId, final int fetchRowNum); boolean updateQuery(final String sql) throws TajoException; http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 8b7f749..144e3b6 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -18,6 +18,7 @@ package org.apache.tajo.client; +import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,8 +33,9 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.client.v2.exception.ClientUnableToConnectException; +import org.apache.tajo.TajoProtos.CodecType; import org.apache.tajo.exception.*; -import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.ipc.QueryMasterClientProtocol; import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface; import org.apache.tajo.jdbc.FetchResultSet; @@ -48,17 +50,21 @@ import java.net.InetSocketAddress; import java.sql.ResultSet; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.apache.tajo.exception.ExceptionUtil.throwIfError; import static org.apache.tajo.exception.ExceptionUtil.throwsIfThisError; import static org.apache.tajo.exception.ReturnStateUtil.ensureOk; import static org.apache.tajo.exception.ReturnStateUtil.isSuccess; -import static org.apache.tajo.ipc.ClientProtos.*; import static org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService; public class QueryClientImpl implements QueryClient { private static final Log LOG = LogFactory.getLog(QueryClientImpl.class); + private static final CodecType DEFAULT_CODEC = CodecType.SNAPPY; + private final ExecutorService executor; private final SessionConnection conn; private final int defaultFetchRows; // maxRows number is limit value of resultSet. The value must be >= 0, and 0 means there is not limit. @@ -69,6 +75,7 @@ public class QueryClientImpl implements QueryClient { this.defaultFetchRows = this.conn.getProperties().getInt(SessionVars.FETCH_ROWNUM.getConfVars().keyname(), SessionVars.FETCH_ROWNUM.getConfVars().defaultIntVal); this.maxRows = 0; + this.executor = Executors.newSingleThreadExecutor(); } @Override @@ -93,6 +100,7 @@ public class QueryClientImpl implements QueryClient { @Override public void close() { + executor.shutdown(); } @Override @@ -150,7 +158,7 @@ public class QueryClientImpl implements QueryClient { } @Override - public ClientProtos.SubmitQueryResponse executeQuery(final String sql) { + public SubmitQueryResponse executeQuery(final String sql) { final BlockingInterface stub = conn.getTMStub(); final QueryRequest request = buildQueryRequest(sql, false); @@ -171,7 +179,7 @@ public class QueryClientImpl implements QueryClient { } @Override - public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) { + public SubmitQueryResponse executeQueryWithJson(final String json) { final BlockingInterface stub = conn.getTMStub(); final QueryRequest request = buildQueryRequest(json, true); @@ -185,7 +193,7 @@ public class QueryClientImpl implements QueryClient { @Override public ResultSet executeQueryAndGetResult(String sql) throws TajoException { - ClientProtos.SubmitQueryResponse response = executeQuery(sql); + SubmitQueryResponse response = executeQuery(sql); throwIfError(response.getState()); QueryId queryId = new QueryId(response.getQueryId()); @@ -203,7 +211,7 @@ public class QueryClientImpl implements QueryClient { @Override public ResultSet executeJsonQueryAndGetResult(final String json) throws TajoException { - ClientProtos.SubmitQueryResponse response = executeQueryWithJson(json); + SubmitQueryResponse response = executeQueryWithJson(json); throwIfError(response.getState()); QueryId queryId = new QueryId(response.getQueryId()); @@ -325,30 +333,53 @@ public class QueryClientImpl implements QueryClient { } @Override - public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws TajoException { + public Future<TajoMemoryResultSet> fetchNextQueryResultAsync(final QueryId queryId, final int fetchRowNum) { + + final SettableFuture<TajoMemoryResultSet> future = SettableFuture.create(); + executor.submit(new Runnable() { + @Override + public void run() { + try { + future.set(fetchNextQueryResult(queryId, fetchRowNum)); + } catch (Throwable e) { + future.setException(e); + } + } + }); + return future; + } + + protected TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) + throws TajoException { + + boolean compress = conn.getProperties().getBool(SessionVars.COMPRESSED_RESULT_TRANSFER); final BlockingInterface stub = conn.getTMStub(); - final GetQueryResultDataRequest request = GetQueryResultDataRequest.newBuilder() - .setSessionId(conn.sessionId) + final GetQueryResultDataRequest.Builder request = GetQueryResultDataRequest.newBuilder(); + request.setSessionId(conn.sessionId) .setQueryId(queryId.getProto()) - .setFetchRowNum(fetchRowNum) - .build(); + .setFetchRowNum(fetchRowNum); + if (compress) { + request.setCompressCodec(DEFAULT_CODEC); + } GetQueryResultDataResponse response; try { - response = stub.getQueryResultData(null, request); + response = stub.getQueryResultData(null, request.build()); } catch (ServiceException e) { throw new RuntimeException(e); } throwIfError(response.getState()); - ClientProtos.SerializedResultSet resultSet = response.getResultSet(); - return new TajoMemoryResultSet(queryId, - new Schema(resultSet.getSchema()), - resultSet.getSerializedTuplesList(), - resultSet.getSerializedTuplesCount(), - getClientSideSessionVars()); + if(response.hasResultSet()) { + SerializedResultSet resultSet = response.getResultSet(); + return new TajoMemoryResultSet(queryId, + new Schema(resultSet.getSchema()), + resultSet, getClientSideSessionVars()); + } else { + return TajoClientUtil.createNullResultSet(queryId); + } } @Override @@ -388,7 +419,7 @@ public class QueryClientImpl implements QueryClient { } @Override - public List<ClientProtos.BriefQueryInfo> getRunningQueryList() { + public List<BriefQueryInfo> getRunningQueryList() { final BlockingInterface stmb = conn.getTMStub(); @@ -404,7 +435,7 @@ public class QueryClientImpl implements QueryClient { } @Override - public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() { + public List<BriefQueryInfo> getFinishedQueryList() { final BlockingInterface stub = conn.getTMStub(); @@ -420,7 +451,7 @@ public class QueryClientImpl implements QueryClient { } @Override - public List<ClientProtos.WorkerResourceInfo> getClusterInfo() { + public List<WorkerResourceInfo> getClusterInfo() { final BlockingInterface stub = conn.getTMStub(); final GetClusterInfoRequest request = GetClusterInfoRequest.newBuilder() @@ -555,7 +586,7 @@ public class QueryClientImpl implements QueryClient { } private QueryIdRequest buildQueryIdRequest(QueryId queryId) { - return ClientProtos.QueryIdRequest.newBuilder() + return QueryIdRequest.newBuilder() .setSessionId(SessionIdProto.newBuilder().setId(getSessionId())) .setQueryId(queryId.getProto()) .build(); http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index ac0ff52..3307ade 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -217,6 +217,7 @@ public class SessionConnection implements Closeable { ensureOk(response.getState()); updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + properties.putAll(sessionVarsCache); return Collections.unmodifiableMap(sessionVarsCache); } http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java index 3d69309..d9c7ff7 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java @@ -41,6 +41,7 @@ import java.net.URI; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; +import java.util.concurrent.Future; @ThreadSafe public class TajoClientImpl extends SessionConnection implements TajoClient, QueryClient, CatalogAdminClient { @@ -86,6 +87,13 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que public TajoClientImpl(ServiceTracker serviceTracker, @Nullable String baseDatabase) throws SQLException { this(serviceTracker, baseDatabase, new KeyValueSet()); } + + @Override + public void close() { + queryClient.close(); + super.close(); + } + /*------------------------------------------------------------------------*/ // QueryClient wrappers /*------------------------------------------------------------------------*/ @@ -130,8 +138,9 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que return queryClient.getResultResponse(queryId); } - public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws TajoException { - return queryClient.fetchNextQueryResult(queryId, fetchRowNum); + @Override + public Future<TajoMemoryResultSet> fetchNextQueryResultAsync(QueryId queryId, int fetchRowNum) { + return queryClient.fetchNextQueryResultAsync(queryId, fetchRowNum); } public boolean updateQuery(final String sql) throws TajoException { http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java index 3f30f97..baafbe8 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java @@ -33,7 +33,6 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import java.io.IOException; import java.sql.ResultSet; -import java.sql.SQLException; public class TajoClientUtil { @@ -101,16 +100,15 @@ public class TajoClientUtil { ClientProtos.SerializedResultSet serializedResultSet = response.getResultSet(); return new TajoMemoryResultSet(new QueryId(response.getQueryId()), new Schema(serializedResultSet.getSchema()), - serializedResultSet.getSerializedTuplesList(), - response.getMaxRowNum(), + serializedResultSet, client.getClientSideSessionVars()); } } public static final ResultSet NULL_RESULT_SET = - new TajoMemoryResultSet(QueryIdFactory.NULL_QUERY_ID, new Schema(), null, 0, null); + new TajoMemoryResultSet(QueryIdFactory.NULL_QUERY_ID, new Schema(), null, null); - public static ResultSet createNullResultSet(QueryId queryId) { - return new TajoMemoryResultSet(queryId, new Schema(), null, 0, null); + public static TajoMemoryResultSet createNullResultSet(QueryId queryId) { + return new TajoMemoryResultSet(queryId, new Schema(), null, null); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java index 8f15710..6bfa258 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java @@ -21,15 +21,23 @@ package org.apache.tajo.jdbc; import org.apache.tajo.QueryId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.client.QueryClient; +import org.apache.tajo.error.Errors; +import org.apache.tajo.exception.DefaultTajoException; +import org.apache.tajo.exception.SQLExceptionUtil; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.storage.Tuple; import java.io.IOException; import java.sql.SQLException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class FetchResultSet extends TajoResultSetBase { protected QueryClient tajoClient; private int fetchRowNum; private TajoMemoryResultSet currentResultSet; + private Future<TajoMemoryResultSet> nextResultSet; private boolean finished; // maxRows number is limit value of resultSet. The value must be >= 0, and 0 means there is not limit. private int maxRows; @@ -49,35 +57,44 @@ public class FetchResultSet extends TajoResultSetBase { } try { - Tuple tuple = null; - if (currentResultSet != null) { - currentResultSet.next(); - tuple = currentResultSet.cur; - } - if (currentResultSet == null || tuple == null) { + while (!finished) { + Tuple tuple; if (currentResultSet != null) { - currentResultSet.close(); - currentResultSet = null; - } - currentResultSet = tajoClient.fetchNextQueryResult(queryId, fetchRowNum); - if (currentResultSet == null) { - finished = true; - return null; - } + currentResultSet.next(); + tuple = currentResultSet.cur; - currentResultSet.next(); - tuple = currentResultSet.cur; - } - if (tuple == null) { - if (currentResultSet != null) { - currentResultSet.close(); - currentResultSet = null; + if (tuple == null) { + currentResultSet.close(); + currentResultSet = null; + } else { + return tuple; + } + + } else { + if(nextResultSet == null) { + nextResultSet = tajoClient.fetchNextQueryResultAsync(queryId, fetchRowNum); + } else { + currentResultSet = nextResultSet.get(); + + if(currentResultSet.totalRow == 0) { + currentResultSet.close(); + currentResultSet = null; + nextResultSet = null; + finished = true; + } else { + // pre-fetch + nextResultSet = tajoClient.fetchNextQueryResultAsync(queryId, fetchRowNum); + } + } } - finished = true; } - return tuple; - } catch (Throwable t) { + + return null; + } catch (ExecutionException e) { + Throwable t = e.getCause(); throw new IOException(t.getMessage(), t); + } catch (Throwable t) { + throw new TajoInternalError(t); } } @@ -87,6 +104,21 @@ public class FetchResultSet extends TajoResultSetBase { currentResultSet.close(); currentResultSet = null; } + + if (nextResultSet != null) { + try { + nextResultSet.get(1, TimeUnit.SECONDS).close(); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof DefaultTajoException) { + throw SQLExceptionUtil.toSQLException((DefaultTajoException) t); + } else { + throw new TajoSQLException(Errors.ResultCode.INTERNAL_ERROR, t, t.getMessage()); + } + } catch (Throwable t) { + throw new TajoSQLException(Errors.ResultCode.INTERNAL_ERROR, t, t.getMessage()); + } + } tajoClient.closeQuery(queryId); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java index 4114d03..49c9e5a 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java @@ -18,46 +18,78 @@ package org.apache.tajo.jdbc; -import com.google.protobuf.ByteString; +import io.netty.buffer.Unpooled; import org.apache.tajo.QueryId; import org.apache.tajo.catalog.Schema; -import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.ipc.ClientProtos.SerializedResultSet; import org.apache.tajo.storage.Tuple; +import org.apache.tajo.tuple.RowBlockReader; +import org.apache.tajo.tuple.memory.HeapRowBlockReader; +import org.apache.tajo.tuple.memory.HeapTuple; +import org.apache.tajo.tuple.memory.MemoryBlock; +import org.apache.tajo.tuple.memory.ResizableMemoryBlock; +import org.apache.tajo.util.CompressionUtil; import java.io.IOException; import java.sql.SQLException; -import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; public class TajoMemoryResultSet extends TajoResultSetBase { - private List<ByteString> serializedTuples; - private AtomicBoolean closed = new AtomicBoolean(false); - private RowStoreUtil.RowStoreDecoder decoder; + private MemoryBlock memory; + private RowBlockReader reader; + private volatile boolean closed; - public TajoMemoryResultSet(QueryId queryId, Schema schema, List<ByteString> serializedTuples, int maxRowNum, + + public TajoMemoryResultSet(QueryId queryId, Schema schema, SerializedResultSet resultSet, Map<String, String> clientSideSessionVars) { super(queryId, schema, clientSideSessionVars); - this.totalRow = maxRowNum; - this.serializedTuples = serializedTuples; - this.decoder = RowStoreUtil.createDecoder(schema); + if(resultSet != null && resultSet.getRows() > 0) { + this.totalRow = resultSet.getRows(); + + try { + // decompress if a codec is specified + if (resultSet.hasDecompressCodec()) { + byte[] compressed = resultSet.getSerializedTuples().toByteArray(); + byte[] uncompressed = CompressionUtil.decompress(resultSet.getDecompressCodec(), compressed); + memory = new ResizableMemoryBlock(Unpooled.wrappedBuffer(uncompressed)); + } else { + memory = new ResizableMemoryBlock(resultSet.getSerializedTuples().asReadOnlyByteBuffer()); + } + } catch (IOException e) { + throw new TajoInternalError(e); + } + + reader = new HeapRowBlockReader(memory, SchemaUtil.toDataTypes(schema), resultSet.getRows()); + } else { + this.totalRow = 0; + this.curRow = 0; + } } @Override protected void init() { cur = null; curRow = 0; + wasNull = false; } @Override - public synchronized void close() throws SQLException { - if (closed.getAndSet(true)) { + public void close() throws SQLException { + if (closed) { return; } + closed = true; cur = null; curRow = -1; - serializedTuples = null; + totalRow = 0; + reader = null; + if(memory != null) { + memory.release(); + memory = null; + } } @Override @@ -68,7 +100,13 @@ public class TajoMemoryResultSet extends TajoResultSetBase { @Override protected Tuple nextTuple() throws IOException { if (curRow < totalRow) { - cur = decoder.toTuple(serializedTuples.get(curRow).toByteArray()); + + HeapTuple heapTuple = new HeapTuple(); + if (reader.next(heapTuple)) { + cur = heapTuple; + } else { + cur = null; + } return cur; } else { return null; @@ -76,6 +114,6 @@ public class TajoMemoryResultSet extends TajoResultSetBase { } public boolean hasResult() { - return serializedTuples.size() > 0; + return totalRow > 0; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java index 3d8d9aa..a8d1239 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java @@ -106,7 +106,7 @@ public abstract class TajoResultSetBase implements ResultSet { } private boolean getBoolean(Tuple tuple, int index) { - return handleNull(tuple, index) ? false : tuple.getBool(index); + return handleNull(tuple, index) ? false : tuple.asDatum(index).asBool(); } @Override @@ -120,7 +120,7 @@ public abstract class TajoResultSetBase implements ResultSet { } private byte getByte(Tuple tuple, int index) { - return handleNull(tuple, index) ? 0 : tuple.getByte(index); + return handleNull(tuple, index) ? 0 : tuple.asDatum(index).asByte(); } @Override @@ -134,7 +134,7 @@ public abstract class TajoResultSetBase implements ResultSet { } private byte[] getBytes(Tuple tuple, int index) { - return handleNull(tuple, index) ? null : tuple.getBytes(index); + return handleNull(tuple, index) ? null : tuple.asDatum(index).asByteArray(); } @Override @@ -148,7 +148,7 @@ public abstract class TajoResultSetBase implements ResultSet { } private double getDouble(Tuple tuple, int index) { - return handleNull(tuple, index) ? 0.0d : tuple.getFloat8(index); + return handleNull(tuple, index) ? 0.0d : tuple.asDatum(index).asFloat8(); } @Override @@ -162,7 +162,7 @@ public abstract class TajoResultSetBase implements ResultSet { } private float getFloat(Tuple tuple, int index) throws SQLException { - return handleNull(tuple, index) ? 0.0f : tuple.getFloat4(index); + return handleNull(tuple, index) ? 0.0f : tuple.asDatum(index).asFloat4(); } @Override @@ -176,7 +176,7 @@ public abstract class TajoResultSetBase implements ResultSet { } private int getInt(Tuple tuple, int index) throws SQLException { - return handleNull(tuple, index) ? 0 : tuple.getInt4(index); + return handleNull(tuple, index) ? 0 : tuple.asDatum(index).asInt4(); } @Override @@ -190,7 +190,7 @@ public abstract class TajoResultSetBase implements ResultSet { } private long getLong(Tuple tuple, int index) throws SQLException { - return handleNull(tuple, index) ? 0 : tuple.getInt8(index); + return handleNull(tuple, index) ? 0 : tuple.asDatum(index).asInt8(); } @Override @@ -232,7 +232,7 @@ public abstract class TajoResultSetBase implements ResultSet { return toTimestamp(tuple.getTimeDate(index), timezone); } default: - return tuple.getText(index); + return tuple.asDatum(index).asChars(); } } @@ -247,7 +247,7 @@ public abstract class TajoResultSetBase implements ResultSet { } private short getShort(Tuple tuple, int index) throws SQLException { - return handleNull(tuple, index) ? 0 : tuple.getInt2(index); + return handleNull(tuple, index) ? 0 : tuple.asDatum(index).asInt2(); } @Override @@ -273,7 +273,7 @@ public abstract class TajoResultSetBase implements ResultSet { case TIMESTAMP: return TimestampDatum.asChars(tuple.getTimeDate(index), timezone, false); default : - return tuple.getText(index); + return tuple.asDatum(index).asChars(); } } @@ -298,7 +298,7 @@ public abstract class TajoResultSetBase implements ResultSet { } private Date getDate(Tuple tuple, TimeZone tz, int index) throws SQLException { - return handleNull(tuple, index) ? null : toDate(tuple.getTimeDate(index), tz); + return handleNull(tuple, index) ? null : toDate(tuple.asDatum(index).asTimeMeta(), tz); } private Date toDate(TimeMeta tm, TimeZone tz) { @@ -329,7 +329,7 @@ public abstract class TajoResultSetBase implements ResultSet { } private Time getTime(Tuple tuple, TimeZone tz, int index) throws SQLException { - return handleNull(tuple, index) ? null : toTime(tuple.getTimeDate(index), tz); + return handleNull(tuple, index) ? null : toTime(tuple.asDatum(index).asTimeMeta(), tz); } private Time toTime(TimeMeta tm, TimeZone tz) { @@ -360,7 +360,7 @@ public abstract class TajoResultSetBase implements ResultSet { } private Timestamp getTimestamp(Tuple tuple, TimeZone tz, int index) throws SQLException { - return handleNull(tuple, index) ? null : toTimestamp(tuple.getTimeDate(index), tz); + return handleNull(tuple, index) ? null : toTimestamp(tuple.asDatum(index).asTimeMeta(), tz); } private Timestamp toTimestamp(TimeMeta tm, TimeZone tz) { http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index 49b0c40..13cb04b 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -27,6 +27,7 @@ import "tajo_protos.proto"; import "TajoIdProtos.proto"; import "CatalogProtos.proto"; import "PrimitiveProtos.proto"; +import "DataTypes.proto"; message CreateSessionRequest { required string username = 1; @@ -114,9 +115,11 @@ message GetQueryStatusRequest { } message SerializedResultSet { - optional SchemaProto schema = 1; - optional int32 bytesNum = 2; - repeated bytes serializedTuples = 3; + required SchemaProto schema = 1; + required int32 rows = 2; + optional int32 decompressedLength = 3; + optional CodecType decompressCodec = 4; + optional bytes serializedTuples = 5; } message SubmitQueryResponse { @@ -161,6 +164,7 @@ message GetQueryResultDataRequest { required SessionIdProto sessionId = 1; required QueryIdProto queryId = 2; required int32 fetchRowNum = 3; + optional CodecType compressCodec = 4; } message GetQueryResultDataResponse { http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java index bb690dd..8b178a6 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -158,6 +158,9 @@ public class TajoTestingCluster { // Python function path conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString()); + // Query output file + conf.setVar(ConfVars.QUERY_OUTPUT_DEFAULT_FILE_FORMAT, BuiltinStorages.DRAW); + /* Since Travis CI limits the size of standard output log up to 4MB */ if (!StringUtils.isEmpty(LOG_LEVEL)) { Level defaultLevel = Logger.getRootLogger().getLevel(); http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-common/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-common/pom.xml b/tajo-common/pom.xml index 70d64a2..2376575 100644 --- a/tajo-common/pom.xml +++ b/tajo-common/pom.xml @@ -218,6 +218,10 @@ <artifactId>netty-buffer</artifactId> </dependency> <dependency> + <groupId>org.iq80.snappy</groupId> + <artifactId>snappy</artifactId> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-common/src/main/java/org/apache/tajo/SessionVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index 36234e0..7e419f0 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -149,6 +149,8 @@ public enum SessionVars implements ConfigKey { Integer.class, Validators.min("0")), BLOCK_ON_RESULT(ConfVars.$RESULT_SET_BLOCK_WAIT, "Whether to block result set on query execution", DEFAULT, Boolean.class, Validators.bool()), + COMPRESSED_RESULT_TRANSFER(ConfVars.$COMPRESSED_RESULT_TRANSFER, "Use compression to optimize result transmission.", + CLI_SIDE_VAR, Boolean.class, Validators.bool()), //------------------------------------------------------------------------------- // Only for Unit Testing http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 0f393f6..f9b9201 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.tajo.BuiltinStorages; import org.apache.tajo.ConfigKey; import org.apache.tajo.QueryId; import org.apache.tajo.SessionVars; @@ -204,7 +205,7 @@ public class TajoConf extends Configuration { // Shuffle Configuration -------------------------------------------------- PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")), SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()), - SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"), + SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, Validators.javaString()), SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", 2, Validators.min("1")), SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192), @@ -215,6 +216,9 @@ public class TajoConf extends Configuration { SHUFFLE_HASH_APPENDER_PAGE_VOLUME("tajo.shuffle.hash.appender.page.volumn-mb", 30), HASH_SHUFFLE_PARENT_DIRS("tajo.hash.shuffle.parent.dirs.count", 10), + // Query output Configuration -------------------------------------------------- + QUERY_OUTPUT_DEFAULT_FILE_FORMAT("tajo.query.output.file-format", BuiltinStorages.TEXT, Validators.javaString()), + // Storage Configuration -------------------------------------------------- ROWFILE_SYNC_INTERVAL("rowfile.sync.interval", 100), MINIMUM_SPLIT_SIZE("tajo.min.split.size", (long) 1), @@ -377,6 +381,7 @@ public class TajoConf extends Configuration { // ResultSet --------------------------------------------------------- $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200), $RESULT_SET_BLOCK_WAIT("tajo.resultset.block.wait", true), + $COMPRESSED_RESULT_TRANSFER("tajo.resultset.compression", false), ; public final String varname; http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java index 04a0267..cb417f3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java @@ -23,6 +23,7 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.tuple.memory.*; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.util.TUtil; public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable { @@ -64,6 +65,16 @@ public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, } @Override + public void addTuple(Tuple tuple) { + if (tuple instanceof UnSafeTuple) { + UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, UnSafeTuple.class); + addTuple(unSafeTuple); + } else { + OffHeapRowBlockUtils.convert(tuple, this); + } + } + + @Override public Tuple build() { return buildToHeapTuple(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java index 6832730..9f3d8a2 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowBlockWriter.java @@ -20,6 +20,8 @@ package org.apache.tajo.tuple.memory; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.TUtil; public class OffHeapRowBlockWriter extends OffHeapRowWriter { private RowBlock rowBlock; @@ -60,4 +62,16 @@ public class OffHeapRowBlockWriter extends OffHeapRowWriter { public TajoDataTypes.DataType[] dataTypes() { return rowBlock.getDataTypes(); } + + + @Override + public void addTuple(Tuple tuple) { + if (tuple instanceof UnSafeTuple) { + UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, UnSafeTuple.class); + addTuple(unSafeTuple); + rowBlock.setRows(rowBlock.rows() + 1); + } else { + OffHeapRowBlockUtils.convert(tuple, this); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java index c9b233f..f082762 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/OffHeapRowWriter.java @@ -23,9 +23,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.IntervalDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.storage.Tuple; import org.apache.tajo.util.SizeOf; -import org.apache.tajo.util.TUtil; /** * @@ -290,16 +288,11 @@ public abstract class OffHeapRowWriter implements RowWriter { putBlob(val.asByteArray()); } - @Override - public void addTuple(Tuple tuple) { - if (tuple instanceof UnSafeTuple) { - UnSafeTuple unSafeTuple = TUtil.checkTypeAndGet(tuple, UnSafeTuple.class); - int length = unSafeTuple.getLength(); - ensureSize(length); - PlatformDependent.copyMemory(unSafeTuple.address(), address() + position(), length); - forward(length); - } else { - OffHeapRowBlockUtils.convert(tuple, this); - } + + protected void addTuple(UnSafeTuple tuple) { + int length = tuple.getLength(); + ensureSize(length); + PlatformDependent.copyMemory(tuple.address(), address() + position(), length); + forward(length); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-common/src/main/java/org/apache/tajo/util/CompressionUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/CompressionUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/CompressionUtil.java new file mode 100644 index 0000000..0026ea8 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/util/CompressionUtil.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import org.apache.tajo.TajoProtos.CodecType; +import org.apache.tajo.exception.UnsupportedException; +import org.iq80.snappy.Snappy; + +import java.io.IOException; + +public class CompressionUtil { + + public static byte[] compress(CodecType type, byte[] uncompressed) throws IOException { + switch (type) { + case SNAPPY: + return SnappyCodec.compress(uncompressed); + default: + throw new IOException(new UnsupportedException("Cannot support " + type)); + } + } + + public static byte[] decompress(CodecType type, byte[] compressed) throws IOException { + switch (type) { + case SNAPPY: + return SnappyCodec.uncompress(compressed); + default: + throw new IOException(new UnsupportedException("Cannot support " + type)); + } + } + + public static int maxCompressedLength(CodecType type, int byteSize) throws IOException { + switch (type) { + case SNAPPY: + return SnappyCodec.maxCompressedLength(byteSize); + default: + throw new IOException(new UnsupportedException("Cannot support " + type)); + } + } + + /** + * Java snappy codec + */ + static class SnappyCodec { + + static byte[] compress(byte[] uncompressed) throws IOException { + return Snappy.compress(uncompressed); + } + + static byte[] uncompress(byte[] compressed) throws IOException { + return Snappy.uncompress(compressed, 0, compressed.length); + } + + static int maxCompressedLength(int byteSize) throws IOException { + return Snappy.maxCompressedLength(byteSize); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-common/src/main/proto/DataTypes.proto ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/proto/DataTypes.proto b/tajo-common/src/main/proto/DataTypes.proto index 53c3d0f..1e10be2 100644 --- a/tajo-common/src/main/proto/DataTypes.proto +++ b/tajo-common/src/main/proto/DataTypes.proto @@ -114,3 +114,4 @@ message DataType { */ optional int32 num_nested_fields = 4; } + http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-common/src/main/proto/tajo_protos.proto ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto index 8474f54..2a836d3 100644 --- a/tajo-common/src/main/proto/tajo_protos.proto +++ b/tajo-common/src/main/proto/tajo_protos.proto @@ -68,4 +68,8 @@ message NodeResourceProto { optional int32 memory = 1; optional int32 virtual_cores = 2; optional int32 disks = 3; +} + +enum CodecType { + SNAPPY = 0; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core-tests/src/test/java/org/apache/tajo/client/TestQueryClientExceptions.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestQueryClientExceptions.java b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestQueryClientExceptions.java index 0457e23..fd25db9 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestQueryClientExceptions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestQueryClientExceptions.java @@ -24,11 +24,15 @@ import org.apache.tajo.TpchTestBase; import org.apache.tajo.annotation.NotThreadSafe; import org.apache.tajo.error.Errors; import org.apache.tajo.exception.*; +import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + import static org.junit.Assert.assertEquals; @NotThreadSafe @@ -100,8 +104,13 @@ public class TestQueryClientExceptions { } @Test(expected = QueryNotFoundException.class) - public void testFetchNextQueryResult() throws TajoException { - client.fetchNextQueryResult(LocalTajoTestingUtility.newQueryId(), 100); + public void testAscynFetchNextQueryResult() throws Throwable { + Future<TajoMemoryResultSet> future = client.fetchNextQueryResultAsync(LocalTajoTestingUtility.newQueryId(), 100); + try { + future.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } } @Test(expected = QueryNotFoundException.class) http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java index 4654d38..f845bb3 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestTaskStatusUpdate.java @@ -59,7 +59,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { // tpch/lineitem.tbl long[] expectedNumRows = new long[]{5, 2, 2, 2}; - long[] expectedNumBytes = new long[]{604, 18, 18, 8}; + long[] expectedNumBytes = new long[]{604, 18, 18, 48}; long[] expectedReadBytes = new long[]{604, 604, 18, 0}; QueryId queryId = getQueryId(res); @@ -78,7 +78,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { // tpch/lineitem.tbl long[] expectedNumRows = new long[]{5, 2, 2, 2, 2, 2}; - long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 194}; + long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 236}; long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0}; QueryId queryId = getQueryId(res); @@ -107,7 +107,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { // in/out * stage(4) long[] expectedNumRows = new long[]{5, 5, 2, 2, 7, 2, 2, 2}; - long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 18}; + long[] expectedNumBytes = new long[]{20, 75, 8, 34, 109, 34, 34, 64}; long[] expectedReadBytes = new long[]{20, 20, 8, 8, 109, 0, 34, 0}; QueryId queryId = getQueryId(res); http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core-tests/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result b/tajo-core-tests/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result index 2035d9f..ef41ed9 100644 --- a/tajo-core-tests/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result +++ b/tajo-core-tests/src/test/resources/results/TestGroupByQuery/testDistinctAggregation_case10.result @@ -1,3 +1,3 @@ ?sum,?sum_1 ------------------------------- -3,414440.9 \ No newline at end of file +3,414440.89999999997 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index 51fb61f..2d87b56 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -42,4 +42,5 @@ Available Session Variables: \set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs. \set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master \set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution +\set COMPRESSED_RESULT_TRANSFER [true or false] - Use compression to optimize result transmission. \set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullFalse.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullFalse.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullFalse.result index a5a6039..6169092 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullFalse.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullFalse.result @@ -5,4 +5,4 @@ c_custkey, o_orderkey, o_orderstatus 3, 3, F 4, , 5, , -(5 rows, , 30 B selected) +(5 rows, , 127 B selected) http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrue.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrue.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrue.result index 566262e..dfc920b 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrue.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrue.result @@ -5,4 +5,4 @@ c_custkey, o_orderkey, o_orderstatus 3, 3, F 4, testnull, testnull 5, testnull, testnull -(5 rows, , 30 B selected) +(5 rows, , 127 B selected) http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrueDeprecated.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrueDeprecated.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrueDeprecated.result index 36ea548..1212ade 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrueDeprecated.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testSelectResultWithNullTrueDeprecated.result @@ -6,4 +6,4 @@ c_custkey, o_orderkey, o_orderstatus 3, 3, F 4, testnull, testnull 5, testnull, testnull -(5 rows, , 30 B selected) \ No newline at end of file +(5 rows, , 127 B selected) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenError.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenError.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenError.result index e2e6b34..a0044c2 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenError.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenError.result @@ -1,5 +1,5 @@ ?count ------------------------------- 5 -(1 rows, , 2 B selected) +(1 rows, , 16 B selected) ERROR: relation 'default.lineitem2' does not exist \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenErrorDeprecated.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenErrorDeprecated.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenErrorDeprecated.result index 17b4d56..e3f1e6b 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenErrorDeprecated.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testStopWhenErrorDeprecated.result @@ -2,5 +2,5 @@ Warning: deprecated to directly use config key in TajoConf.ConfVars. Please exec ?count ------------------------------- 5 -(1 rows, , 2 B selected) +(1 rows, , 16 B selected) ERROR: relation 'default.lineitem2' does not exist \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevPop1.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevPop1.result b/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevPop1.result index ef2e5d3..a2faac0 100644 --- a/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevPop1.result +++ b/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevPop1.result @@ -3,5 +3,5 @@ linenumber_stddev_pop,suppkey_stddev_pop,extendedprice_stddev_pop,discount_stdde 0.5,197.5,12407.465,0.02500000223517418 0.5,197.5,12407.465,0.02500000223517418 0.0,0.0,0.0,0.0 -0.5,2371.0,3630.790000000001,0.02000000141561031 -0.5,2371.0,3630.790000000001,0.02000000141561031 \ No newline at end of file +0.5,2371.0,3630.790000000001,0.020000001415610313 +0.5,2371.0,3630.790000000001,0.020000001415610313 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevSamp1.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevSamp1.result b/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevSamp1.result index 5db6227..625f91b 100644 --- a/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevSamp1.result +++ b/tajo-core-tests/src/test/resources/results/TestWindowQuery/testStdDevSamp1.result @@ -1,7 +1,7 @@ linenumber_stddev_samp,suppkey_stddev_samp,extendedprice_stddev_samp,discount_stddev_samp ------------------------------- -0.7071067811865476,279.30717856868625,17546.805277669493,0.03535534222034101 -0.7071067811865476,279.30717856868625,17546.805277669493,0.03535534222034101 +0.7071067811865476,279.30717856868625,17546.805277669493,0.035355342220341014 +0.7071067811865476,279.30717856868625,17546.805277669493,0.035355342220341014 null,null,null,null -0.7071067811865476,3353.1003563866084,5134.7124601286105,0.0282842732494372 -0.7071067811865476,3353.1003563866084,5134.7124601286105,0.0282842732494372 \ No newline at end of file +0.7071067811865476,3353.1003563866084,5134.7124601286105,0.028284273249437206 +0.7071067811865476,3353.1003563866084,5134.7124601286105,0.028284273249437206 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java index e09684a..2d32785 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.planner.global; import com.google.common.base.Preconditions; +import org.apache.tajo.BuiltinStorages; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; @@ -39,7 +40,7 @@ public class DataChannel { private Schema schema; - private String storeType = "RAW"; + private String storeType = BuiltinStorages.RAW; public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) { this.srcId = srcId; http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 8c6aa8c..5049995 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -37,9 +37,9 @@ import org.apache.tajo.engine.planner.global.builder.DistinctGroupbyBuilder; import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteEngine; import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRuleProvider; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.NotImplementedException; import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoInternalError; -import org.apache.tajo.exception.NotImplementedException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.Target; @@ -69,6 +69,7 @@ public class GlobalPlanner { private final TajoConf conf; private final String storeType; + private final String finalOutputStoreType; private final CatalogService catalog; private final GlobalPlanRewriteEngine rewriteEngine; @@ -77,7 +78,7 @@ public class GlobalPlanner { this.conf = conf; this.catalog = catalog; this.storeType = conf.getVar(ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase(); - Preconditions.checkArgument(storeType != null); + this.finalOutputStoreType = conf.getVar(ConfVars.QUERY_OUTPUT_DEFAULT_FILE_FORMAT).toUpperCase(); Class<? extends GlobalPlanRewriteRuleProvider> clazz = (Class<? extends GlobalPlanRewriteRuleProvider>) conf.getClassVar(GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS); @@ -159,10 +160,10 @@ public class GlobalPlanner { LOG.info("\n\nOptimized master plan\n" + masterPlan.toString()); } - private static void setFinalOutputChannel(DataChannel outputChannel, Schema outputSchema) { + private void setFinalOutputChannel(DataChannel outputChannel, Schema outputSchema) { outputChannel.setShuffleType(NONE_SHUFFLE); outputChannel.setShuffleOutputNum(1); - outputChannel.setStoreType("TEXT"); + outputChannel.setStoreType(finalOutputStoreType); outputChannel.setSchema(outputSchema); } @@ -988,9 +989,8 @@ public class GlobalPlanner { ((TableSubQueryNode)child).getSubQuery().getType() == NodeType.UNION) { MasterPlan masterPlan = context.plan; for (DataChannel dataChannel : masterPlan.getIncomingChannels(execBlock.getId())) { - // This data channel will be stored in staging directory, but RawFile, default file type, does not support - // distributed file system. It needs to change the file format for distributed file system. - dataChannel.setStoreType("TEXT"); + + dataChannel.setStoreType(finalOutputStoreType); ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId()); ProjectionNode copy = PlannerUtil.clone(plan, node); http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java index 1e90d04..e9f4fb9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java @@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.tajo.BuiltinStorages; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; @@ -209,13 +210,11 @@ public class PhysicalPlanUtil { */ private static void setNullCharForTextSerializer(TableMeta meta, String nullChar) { String storeType = meta.getStoreType(); - if (storeType.equalsIgnoreCase("TEXT")) { + if (storeType.equalsIgnoreCase(BuiltinStorages.TEXT)) { meta.putOption(StorageConstants.TEXT_NULL, nullChar); - } else if (storeType.equalsIgnoreCase("TEXT")) { - meta.putOption(StorageConstants.TEXT_NULL, nullChar); - } else if (storeType.equalsIgnoreCase("RCFILE")) { + } else if (storeType.equalsIgnoreCase(BuiltinStorages.RCFILE)) { meta.putOption(StorageConstants.RCFILE_NULL, nullChar); - } else if (storeType.equalsIgnoreCase("SEQUENCEFILE")) { + } else if (storeType.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) { meta.putOption(StorageConstants.SEQUENCEFILE_NULL, nullChar); } } @@ -228,13 +227,11 @@ public class PhysicalPlanUtil { */ public static boolean containsNullChar(TableMeta meta) { String storeType = meta.getStoreType(); - if (storeType.equalsIgnoreCase("TEXT")) { - return meta.containsOption(StorageConstants.TEXT_NULL); - } else if (storeType.equalsIgnoreCase("TEXT")) { + if (storeType.equalsIgnoreCase(BuiltinStorages.TEXT)) { return meta.containsOption(StorageConstants.TEXT_NULL); - } else if (storeType.equalsIgnoreCase("RCFILE")) { + } else if (storeType.equalsIgnoreCase(BuiltinStorages.RCFILE)) { return meta.containsOption(StorageConstants.RCFILE_NULL); - } else if (storeType.equalsIgnoreCase("SEQUENCEFILE")) { + } else if (storeType.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) { return meta.containsOption(StorageConstants.SEQUENCEFILE_NULL); } else { return false; http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 1ecabf1..63b5445 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -34,6 +34,7 @@ import org.apache.tajo.plan.expr.EvalTreeUtil; import org.apache.tajo.plan.expr.FieldEval; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; @@ -61,6 +62,8 @@ public class SeqScanExec extends ScanExec { // scanner iterator with filter or without filter private ScanIterator scanIt; + private boolean needProjection; + public SeqScanExec(TaskAttemptContext context, ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema()); @@ -198,7 +201,7 @@ public class SeqScanExec extends ScanExec { // for non-projected fields. Schema actualInSchema = scanner.isProjectable() ? projectedFields : inSchema; - this.projector = new Projector(context, actualInSchema, outSchema, plan.getTargets()); + initializeProjector(actualInSchema); if (plan.hasQual()) { qual.bind(context.getEvalContext(), actualInSchema); @@ -210,6 +213,37 @@ public class SeqScanExec extends ScanExec { super.init(); } + protected void initializeProjector(Schema actualInSchema){ + Target[] realTargets; + if (plan.getTargets() == null) { + realTargets = PlannerUtil.schemaToTargets(outSchema); + } else { + realTargets = plan.getTargets(); + } + + //if all column is selected and there is no have expression, projection can be skipped + if (realTargets.length == inSchema.size()) { + for (int i = 0; i < inSchema.size(); i++) { + if (realTargets[i].getEvalTree() instanceof FieldEval) { + FieldEval f = realTargets[i].getEvalTree(); + if(!f.getColumnRef().equals(inSchema.getColumn(i))) { + needProjection = true; + break; + } + } else { + needProjection = true; + break; + } + } + } else { + needProjection = true; + } + + if(needProjection) { + projector = new Projector(context, actualInSchema, outSchema, plan.getTargets()); + } + } + @Override public ScanNode getScanNode() { return plan; @@ -253,6 +287,8 @@ public class SeqScanExec extends ScanExec { while(scanIt.hasNext()) { Tuple t = scanIt.next(); + if(!needProjection) return t; + Tuple outTuple = projector.eval(t); outTuple.setOffset(t.getOffset()); return outTuple; http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 40bca04..cd43add 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -19,7 +19,6 @@ package org.apache.tajo.master; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; @@ -62,7 +61,6 @@ import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; -import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; @@ -533,7 +531,6 @@ public class TajoMasterClientService extends AbstractService { public GetQueryResultDataResponse getQueryResultData(RpcController controller, GetQueryResultDataRequest request) throws ServiceException { GetQueryResultDataResponse.Builder builder = GetQueryResultDataResponse.newBuilder(); - SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder(); try { context.getSessionManager().touch(request.getSessionId().getId()); @@ -561,29 +558,25 @@ public class TajoMasterClientService extends AbstractService { scanNode.init(resultTableDesc); } - queryResultScanner = - new NonForwardQueryResultFileScanner(context.getConf(), session.getSessionId(), queryId, scanNode, - resultTableDesc, Integer.MAX_VALUE); + if(request.hasCompressCodec()) { + queryResultScanner = new NonForwardQueryResultFileScanner(context.getConf(), session.getSessionId(), + queryId, scanNode, resultTableDesc, Integer.MAX_VALUE, request.getCompressCodec()); + } else { + queryResultScanner = new NonForwardQueryResultFileScanner(context.getConf(), + session.getSessionId(), queryId, scanNode, resultTableDesc, Integer.MAX_VALUE); + } + queryResultScanner.init(); session.addNonForwardQueryResultScanner(queryResultScanner); } - List<ByteString> rows = queryResultScanner.getNextRows(request.getFetchRowNum()); - - resultSetBuilder.setSchema(queryResultScanner.getLogicalSchema().getProto()); - resultSetBuilder.addAllSerializedTuples(rows); + SerializedResultSet resultSet = queryResultScanner.nextRowBlock(request.getFetchRowNum()); - builder.setResultSet(resultSetBuilder.build()); + builder.setResultSet(resultSet); builder.setState(OK); - - LOG.info("Send result to client for " + - request.getSessionId().getId() + "," + queryId + ", " + rows.size() + " rows"); - } catch (Throwable t) { printStackTraceIfError(LOG, t); - builder.setState(returnError(t)); - builder.setResultSet(resultSetBuilder.build()); // required field } return builder.build(); http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index 9e132b0..809b81f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -19,34 +19,48 @@ package org.apache.tajo.master.exec; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.TajoProtos.CodecType; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.physical.PartitionMergeScanExec; import org.apache.tajo.engine.planner.physical.ScanExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.TajoException; +import org.apache.tajo.ipc.ClientProtos.SerializedResultSet; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.storage.*; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.tuple.memory.MemoryBlock; +import org.apache.tajo.tuple.memory.MemoryRowBlock; +import org.apache.tajo.util.CompressionUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; -import org.apache.hadoop.fs.Path; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner { + private final static Log LOG = LogFactory.getLog(NonForwardQueryResultFileScanner.class); private QueryId queryId; private String sessionId; @@ -54,13 +68,25 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc private TableDesc tableDesc; private RowStoreEncoder rowEncoder; private int maxRow; - private int currentNumRows; + private boolean eof; + private volatile long totalRows; + private volatile int currentNumRows; + private volatile boolean isStopped; private TaskAttemptContext taskContext; private TajoConf tajoConf; private ScanNode scanNode; + private CodecType codecType; + private ExecutorService executor; + private MemoryRowBlock rowBlock; + private Future<MemoryRowBlock> nextFetch; + + public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, + TableDesc tableDesc, int maxRow) throws IOException { + this(tajoConf, sessionId, queryId, scanNode, tableDesc, maxRow, null); + } public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, - TableDesc tableDesc, int maxRow) throws IOException { + TableDesc tableDesc, int maxRow, CodecType codecType) throws IOException { this.tajoConf = tajoConf; this.sessionId = sessionId; this.queryId = queryId; @@ -68,6 +94,7 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc this.tableDesc = tableDesc; this.maxRow = maxRow; this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema()); + this.codecType = codecType; } public void init() throws IOException, TajoException { @@ -91,8 +118,10 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc new QueryContext(tajoConf), null, new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), fragmentProtos, null); - scanExec = new PartitionMergeScanExec(taskContext, scanNode, fragmentProtos); - scanExec.init(); + this.scanExec = new PartitionMergeScanExec(taskContext, scanNode, fragmentProtos); + this.scanExec.init(); + } else { + close(); } } @@ -113,32 +142,52 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc return currentNumRows; } - public void close() throws Exception { + public void close() throws IOException { + if(isStopped) { + return; + } + + isStopped = true; if (scanExec != null) { scanExec.close(); scanExec = null; } + + if(rowBlock != null) { + rowBlock.release(); + rowBlock = null; + } + + if(executor != null) { + executor.shutdown(); + } //remove temporal final output if (!tajoConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { Path temporalResultDir = TajoConf.getTemporalResultDir(tajoConf, queryId); if (tableDesc.getUri().equals(temporalResultDir.toUri())) { - temporalResultDir.getParent().getFileSystem(tajoConf).delete(temporalResultDir, true); + temporalResultDir.getFileSystem(tajoConf).delete(temporalResultDir.getParent(), true); } } + + + LOG.info(String.format("\"Sent result to client for %s, queryId: %s %s rows: %d", + sessionId, queryId, + codecType != null ? ", compression: " + codecType : "", + totalRows + )); } public List<ByteString> getNextRows(int fetchRowNum) throws IOException { - List<ByteString> rows = new ArrayList<ByteString>(); + List<ByteString> rows = new ArrayList<>(); if (scanExec == null) { return rows; } int rowCount = 0; - while (true) { + while (!eof) { Tuple tuple = scanExec.next(); if (tuple == null) { - scanExec.close(); - scanExec = null; + eof = true; break; } @@ -149,15 +198,117 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc break; } if (currentNumRows >= maxRow) { - scanExec.close(); - scanExec = null; + eof = true; break; } } + + if(eof) { + close(); + } return rows; } @Override + public SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException { + try { + final SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder(); + resultSetBuilder.setSchema(getLogicalSchema().getProto()); + resultSetBuilder.setRows(0); + + if (isStopped) return resultSetBuilder.build(); + + if (nextFetch == null) { + nextFetch = fetchNextRowBlock(fetchRowNum); + } + + MemoryRowBlock rowBlock = nextFetch.get(); + + if (rowBlock.rows() > 0) { + resultSetBuilder.setRows(rowBlock.rows()); + MemoryBlock memoryBlock = rowBlock.getMemory(); + + if (codecType != null) { + byte[] uncompressedBytes = new byte[memoryBlock.readableBytes()]; + memoryBlock.getBuffer().getBytes(0, uncompressedBytes); + + byte[] compressedBytes = CompressionUtil.compress(codecType, uncompressedBytes); + resultSetBuilder.setDecompressedLength(uncompressedBytes.length); + resultSetBuilder.setDecompressCodec(codecType); + resultSetBuilder.setSerializedTuples(ByteString.copyFrom(compressedBytes)); + } else { + ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes()); + resultSetBuilder.setDecompressedLength(uncompressed.remaining()); + resultSetBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed)); + } + } + + // pre-fetch + if (!eof) { + nextFetch = fetchNextRowBlock(fetchRowNum); + } else { + close(); + } + return resultSetBuilder.build(); + } catch (Throwable t) { + throw new IOException(t.getMessage(), t); + } + } + + /** + * Asynchronously fetch final output data + */ + private Future<MemoryRowBlock> fetchNextRowBlock(final int fetchRowNum) throws IOException { + final SettableFuture<MemoryRowBlock> future = SettableFuture.create(); + if (rowBlock == null) { + rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(tableDesc.getLogicalSchema())); + } + + if (scanExec == null) { + rowBlock.clear(); + future.set(rowBlock); + return future; + } + + if (executor == null) { + executor = Executors.newSingleThreadExecutor(); + } + + executor.submit(new Runnable() { + @Override + public void run() { + try { + rowBlock.clear(); + int endRow = currentNumRows + fetchRowNum; + while (currentNumRows < endRow) { + Tuple tuple = scanExec.next(); + if (tuple == null) { + eof = true; + break; + } else { + rowBlock.getWriter().addTuple(tuple); + currentNumRows++; + if (currentNumRows >= maxRow) { + eof = true; + break; + } + } + } + + if (rowBlock.rows() > 0) { + totalRows += rowBlock.rows(); + } + + future.set(rowBlock); + } catch (IOException e) { + future.setException(e); + } + } + }); + return future; + } + + @Override public Schema getLogicalSchema() { return tableDesc.getLogicalSchema(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java index a104c99..fab3a1f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java @@ -23,26 +23,30 @@ import org.apache.tajo.QueryId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.exception.TajoException; +import org.apache.tajo.ipc.ClientProtos.SerializedResultSet; import java.io.IOException; import java.util.List; public interface NonForwardQueryResultScanner { - public void close() throws Exception; + void close() throws Exception; - public Schema getLogicalSchema(); + Schema getLogicalSchema(); - public List<ByteString> getNextRows(int fetchRowNum) throws IOException; + @Deprecated + List<ByteString> getNextRows(int fetchRowNum) throws IOException; - public QueryId getQueryId(); + SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException; + + QueryId getQueryId(); - public String getSessionId(); + String getSessionId(); - public TableDesc getTableDesc(); + TableDesc getTableDesc(); - public void init() throws IOException, TajoException; + void init() throws IOException, TajoException; - public int getCurrentRowNumber(); + int getCurrentRowNumber(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/8bbd51df/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index e176818..14077b1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -25,11 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.tajo.QueryId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.DataType; @@ -46,6 +42,7 @@ import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.ipc.ClientProtos.SerializedResultSet; import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.master.rm.NodeStatus; import org.apache.tajo.plan.LogicalPlan; @@ -60,11 +57,14 @@ import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.tuple.memory.MemoryBlock; +import org.apache.tajo.tuple.memory.MemoryRowBlock; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -85,6 +85,8 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult private Schema outSchema; private RowStoreEncoder encoder; private PhysicalExec physicalExec; + private MemoryRowBlock rowBlock; + private boolean eof; public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan plan, QueryId queryId, String sessionId, int maxRow) { @@ -134,13 +136,16 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult encoder = RowStoreUtil.createEncoder(getLogicalSchema()); physicalExec.init(); + eof = false; } @Override - public void close() throws Exception { - tableDesc = null; - outSchema = null; - encoder = null; + public void close() { + if(rowBlock != null) { + rowBlock.release(); + rowBlock = null; + } + if (physicalExec != null) { try { physicalExec.close(); @@ -617,6 +622,58 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult } @Override + public SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException { + int rowCount = 0; + + SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder(); + resultSetBuilder.setSchema(getLogicalSchema().getProto()); + resultSetBuilder.setRows(rowCount); + int startRow = currentRow; + int endRow = startRow + fetchRowNum; + + if (physicalExec == null) { + return resultSetBuilder.build(); + } + + while (currentRow < endRow) { + Tuple currentTuple = physicalExec.next(); + + if (currentTuple == null) { + eof = true; + break; + } else { + if (rowBlock == null) { + rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(tableDesc.getLogicalSchema())); + } + + rowBlock.getWriter().addTuple(currentTuple); + currentRow++; + rowCount++; + + if(currentRow >= maxRow) { + eof = true; + break; + } + } + } + + if (rowCount > 0) { + resultSetBuilder.setRows(rowCount); + MemoryBlock memoryBlock = rowBlock.getMemory(); + ByteBuffer rows = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes()); + + resultSetBuilder.setDecompressedLength(rows.remaining()); + resultSetBuilder.setSerializedTuples(ByteString.copyFrom(rows)); + rowBlock.clear(); + } + + if (eof) { + close(); + } + return resultSetBuilder.build(); + } + + @Override public QueryId getQueryId() { return queryId; }
