Repository: tajo Updated Branches: refs/heads/master 1f9ae1da0 -> 62cc6f686
TAJO-1944: Support text resultset for REST Closes #876 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/62cc6f68 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/62cc6f68 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/62cc6f68 Branch: refs/heads/master Commit: 62cc6f686021c91c85edcde00383297b711af248 Parents: 1f9ae1d Author: charsyam <[email protected]> Authored: Sat Dec 26 00:27:29 2015 +0900 Committer: charsyam <[email protected]> Committed: Sat Dec 26 00:27:29 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../rs/resources/TestQueryResultResource.java | 133 ++++++++++++++++--- .../exec/NonForwardQueryResultFileScanner.java | 31 +++++ .../exec/NonForwardQueryResultScanner.java | 3 + .../NonForwardQueryResultSystemScanner.java | 32 +++++ .../tajo/ws/rs/annotation/RestReturnType.java | 34 +++++ .../ws/rs/resources/QueryResultResource.java | 82 +++++------- .../outputs/AbstractStreamingOutput.java | 43 ++++++ .../outputs/BinaryStreamingOutput.java | 102 ++++++++++++++ .../resources/outputs/CSVStreamingOutput.java | 129 ++++++++++++++++++ .../rs/resources/outputs/RestOutputFactory.java | 85 ++++++++++++ 11 files changed, 611 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index c02bfc7..fea896c 100644 --- a/CHANGES +++ b/CHANGES @@ -8,6 +8,8 @@ Release 0.12.0 - unreleased IMPROVEMENT + TAJO-1944: Support text resultset for REST (DaeMyung) + TAJO-1950: Query master uses too much memory during range shuffle. (jihoon) TAJO-1858: Aligning error message in execute query page of web UI is needed. http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java b/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java index 2ab1add..26fa011 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/ws/rs/resources/TestQueryResultResource.java @@ -18,7 +18,6 @@ package org.apache.tajo.ws.rs.resources; -import org.apache.commons.codec.binary.Base64; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.TajoConstants; import org.apache.tajo.conf.TajoConf.ConfVars; @@ -44,6 +43,7 @@ import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; @@ -52,7 +52,6 @@ import java.io.DataInputStream; import java.io.EOFException; import java.io.InputStream; import java.net.URI; -import java.security.MessageDigest; import java.util.List; import static org.apache.tajo.exception.ErrorUtil.isOk; @@ -65,7 +64,6 @@ public class TestQueryResultResource extends QueryTestCaseBase { private Client restClient; private static final String tajoSessionIdHeaderName = "X-Tajo-Session"; - private static final String tajoDigestHeaderName = "X-Tajo-Digest"; private static final String tajoOffsetHeaderName = "X-Tajo-Offset"; private static final String tajoCountHeaderName = "X-Tajo-Count"; private static final String tajoEOSHeaderName = "X-Tajo-EOS"; @@ -176,7 +174,7 @@ public class TestQueryResultResource extends QueryTestCaseBase { } @Test - public void testGetQueryResultSet() throws Exception { + public void testGetQueryResultSetWithBinary() throws Exception { String sessionId = generateNewSessionAndGetId(); URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem"); URI queryResultURI = new URI(queryIdURI + "/result"); @@ -199,11 +197,10 @@ public class TestQueryResultResource extends QueryTestCaseBase { Response queryResultSetResponse = restClient.target(queryResultSetURI) .queryParam("count", 100) .request().header(tajoSessionIdHeaderName, sessionId) + .header(HttpHeaders.ACCEPT, "application/octet-stream") .get(); assertNotNull(queryResultSetResponse); - String tajoDigest = queryResultSetResponse.getHeaderString(tajoDigestHeaderName); - assertTrue(tajoDigest != null && !tajoDigest.isEmpty()); DataInputStream queryResultSetInputStream = new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class))); @@ -213,7 +210,6 @@ public class TestQueryResultResource extends QueryTestCaseBase { boolean isFinished = false; List<Tuple> tupleList = TUtil.newList(); RowStoreUtil.RowStoreDecoder decoder = RowStoreUtil.createDecoder(response.getSchema()); - MessageDigest messageDigest = MessageDigest.getInstance("SHA-1"); while (!isFinished) { try { int length = queryResultSetInputStream.readInt(); @@ -223,14 +219,12 @@ public class TestQueryResultResource extends QueryTestCaseBase { assertEquals(length, readBytes); tupleList.add(decoder.toTuple(dataByteArray)); - messageDigest.update(dataByteArray); } catch (EOFException eof) { isFinished = true; } } assertEquals(5, tupleList.size()); - assertEquals(tajoDigest, Base64.encodeBase64String(messageDigest.digest())); for (Tuple aTuple: tupleList) { assertTrue(aTuple.getInt4(response.getSchema().getColumnId("l_orderkey")) > 0); @@ -238,7 +232,7 @@ public class TestQueryResultResource extends QueryTestCaseBase { } @Test - public void testGetQueryResultSetWithDefaultCount() throws Exception { + public void testGetQueryResultSetWithDefaultCountWithBinary() throws Exception { String sessionId = generateNewSessionAndGetId(); URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem"); URI queryResultURI = new URI(queryIdURI + "/result"); @@ -260,19 +254,20 @@ public class TestQueryResultResource extends QueryTestCaseBase { Response queryResultSetResponse = restClient.target(queryResultSetURI) .request().header(tajoSessionIdHeaderName, sessionId) + .header(HttpHeaders.ACCEPT, "application/octet-stream") .get(); assertNotNull(queryResultSetResponse); - String tajoDigest = queryResultSetResponse.getHeaderString(tajoDigestHeaderName); int offset = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoOffsetHeaderName)); int count = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoCountHeaderName)); boolean eos = Boolean.valueOf(queryResultSetResponse.getHeaderString(tajoEOSHeaderName)); + int contentLength = Integer.valueOf(queryResultSetResponse.getHeaderString(HttpHeaders.CONTENT_LENGTH)); - assertTrue(tajoDigest != null && !tajoDigest.isEmpty()); assertTrue(eos); assertEquals(0, offset); assertEquals(5, count); + DataInputStream queryResultSetInputStream = new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class))); @@ -280,28 +275,136 @@ public class TestQueryResultResource extends QueryTestCaseBase { boolean isFinished = false; List<Tuple> tupleList = TUtil.newList(); + int receviedSize = 0; RowStoreUtil.RowStoreDecoder decoder = RowStoreUtil.createDecoder(response.getSchema()); - MessageDigest messageDigest = MessageDigest.getInstance("SHA-1"); while (!isFinished) { try { int length = queryResultSetInputStream.readInt(); + receviedSize += (length + 4); byte[] dataByteArray = new byte[length]; int readBytes = queryResultSetInputStream.read(dataByteArray); assertEquals(length, readBytes); tupleList.add(decoder.toTuple(dataByteArray)); - messageDigest.update(dataByteArray); } catch (EOFException eof) { isFinished = true; } } + assertEquals(contentLength, receviedSize); assertEquals(5, tupleList.size()); - assertEquals(tajoDigest, Base64.encodeBase64String(messageDigest.digest())); for (Tuple aTuple: tupleList) { assertTrue(aTuple.getInt4(response.getSchema().getColumnId("l_orderkey")) > 0); } } + + @Test + public void testGetQueryResultSetWithCSV() throws Exception { + String sessionId = generateNewSessionAndGetId(); + URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem"); + URI queryResultURI = new URI(queryIdURI + "/result"); + + GetQueryResultDataResponse response = restClient.target(queryResultURI) + .request().header(tajoSessionIdHeaderName, sessionId) + .get(new GenericType<>(GetQueryResultDataResponse.class)); + + assertNotNull(response); + assertNotNull(response.getResultCode()); + assertTrue(isOk(response.getResultCode())); + assertNotNull(response.getSchema()); + assertEquals(16, response.getSchema().getRootColumns().size()); + assertNotNull(response.getResultset()); + assertTrue(response.getResultset().getId() != 0); + assertNotNull(response.getResultset().getLink()); + + URI queryResultSetURI = response.getResultset().getLink(); + + Response queryResultSetResponse = restClient.target(queryResultSetURI) + .request().header(tajoSessionIdHeaderName, sessionId) + .header(HttpHeaders.ACCEPT, "text/csv") + .get(); + + assertNotNull(queryResultSetResponse); + int offset = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoOffsetHeaderName)); + int count = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoCountHeaderName)); + boolean eos = Boolean.valueOf(queryResultSetResponse.getHeaderString(tajoEOSHeaderName)); + int length = Integer.valueOf(queryResultSetResponse.getHeaderString(HttpHeaders.CONTENT_LENGTH)); + + assertTrue(eos); + assertEquals(0, offset); + assertEquals(5, count); + assertTrue(length > 0); + + DataInputStream queryResultSetInputStream = + new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class))); + + assertNotNull(queryResultSetInputStream); + + try { + byte[] dataByteArray = new byte[length]; + int readBytes = queryResultSetInputStream.read(dataByteArray); + + assertEquals(length, readBytes); + + } catch (EOFException eof) { + } + + assertEquals(5, count); + } + + @Test + public void testGetQueryResultSetWithDefaultOutputType() throws Exception { + String sessionId = generateNewSessionAndGetId(); + URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem"); + URI queryResultURI = new URI(queryIdURI + "/result"); + + GetQueryResultDataResponse response = restClient.target(queryResultURI) + .request().header(tajoSessionIdHeaderName, sessionId) + .get(new GenericType<>(GetQueryResultDataResponse.class)); + + assertNotNull(response); + assertNotNull(response.getResultCode()); + assertTrue(isOk(response.getResultCode())); + assertNotNull(response.getSchema()); + assertEquals(16, response.getSchema().getRootColumns().size()); + assertNotNull(response.getResultset()); + assertTrue(response.getResultset().getId() != 0); + assertNotNull(response.getResultset().getLink()); + + URI queryResultSetURI = response.getResultset().getLink(); + + Response queryResultSetResponse = restClient.target(queryResultSetURI) + .request().header(tajoSessionIdHeaderName, sessionId) + .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON) + .get(); + + assertNotNull(queryResultSetResponse); + int offset = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoOffsetHeaderName)); + int count = Integer.valueOf(queryResultSetResponse.getHeaderString(tajoCountHeaderName)); + boolean eos = Boolean.valueOf(queryResultSetResponse.getHeaderString(tajoEOSHeaderName)); + int length = Integer.valueOf(queryResultSetResponse.getHeaderString(HttpHeaders.CONTENT_LENGTH)); + + assertTrue(eos); + assertEquals(0, offset); + assertEquals(5, count); + assertTrue(length > 0); + + DataInputStream queryResultSetInputStream = + new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class))); + + assertNotNull(queryResultSetInputStream); + + try { + byte[] dataByteArray = new byte[length]; + int readBytes = queryResultSetInputStream.read(dataByteArray); + + assertEquals(length, readBytes); + + } catch (EOFException eof) { + } + + assertEquals(5, count); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index 8953315..a1728ec 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -179,6 +179,37 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc )); } + public List<Tuple> getNextTupleRows(int fetchRowNum) throws IOException { + List<Tuple> rows = new ArrayList<>(); + if (scanExec == null) { + return rows; + } + int rowCount = 0; + while (!eof) { + Tuple tuple = scanExec.next(); + if (tuple == null) { + eof = true; + break; + } + + rows.add(tuple); + rowCount++; + currentNumRows++; + if (rowCount >= fetchRowNum) { + break; + } + if (currentNumRows >= maxRow) { + eof = true; + break; + } + } + + if(eof) { + close(); + } + return rows; + } + public List<ByteString> getNextRows(int fetchRowNum) throws IOException { List<ByteString> rows = new ArrayList<>(); if (scanExec == null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java index fab3a1f..e4f3475 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java @@ -24,6 +24,7 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.exception.TajoException; import org.apache.tajo.ipc.ClientProtos.SerializedResultSet; +import org.apache.tajo.storage.Tuple; import java.io.IOException; import java.util.List; @@ -37,6 +38,8 @@ public interface NonForwardQueryResultScanner { @Deprecated List<ByteString> getNextRows(int fetchRowNum) throws IOException; + List<Tuple> getNextTupleRows(int fetchRowNum) throws IOException; + SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException; QueryId getQueryId(); http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 7f6db9b..7f505a6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -622,6 +622,38 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult } @Override + public List<Tuple> getNextTupleRows(int fetchRowNum) throws IOException { + List<Tuple> rows = new ArrayList<>(); + int startRow = currentRow; + int endRow = startRow + fetchRowNum; + + if (physicalExec == null) { + return rows; + } + + while (currentRow < endRow) { + Tuple currentTuple = physicalExec.next(); + + if (currentTuple == null) { + physicalExec.close(); + physicalExec = null; + break; + } + + currentRow++; + rows.add(currentTuple); + + if (currentRow >= maxRow) { + physicalExec.close(); + physicalExec = null; + break; + } + } + + return rows; + } + + @Override public SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException { int rowCount = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java new file mode 100644 index 0000000..e37cfae --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/annotation/RestReturnType.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Definition of the return type of Rest API. + * According to the output type, Rest APIs return their results in text or binary. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface RestReturnType { + String mimeType(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java index 93d397a..7d5c78a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java @@ -36,6 +36,8 @@ import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.session.Session; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.ws.rs.*; +import org.apache.tajo.ws.rs.resources.outputs.AbstractStreamingOutput; +import org.apache.tajo.ws.rs.resources.outputs.RestOutputFactory; import org.apache.tajo.ws.rs.responses.GetQueryResultDataResponse; import org.apache.tajo.ws.rs.responses.ResultSetInfoResponse; @@ -69,6 +71,7 @@ public class QueryResultResource { private static final String cacheIdKeyName = "cacheId"; private static final String offsetKeyName = "offset"; private static final String countKeyName = "count"; + private static final String acceptTypeKeyName = "accept"; private static final String tajoDigestHeaderName = "X-Tajo-Digest"; private static final String tajoOffsetHeaderName = "X-Tajo-Offset"; @@ -256,8 +259,8 @@ public class QueryResultResource { @GET @Path("{cacheId}") - @Produces(MediaType.APPLICATION_OCTET_STREAM) public Response getQueryResultSet(@HeaderParam(QueryResource.tajoSessionIdHeaderName) String sessionId, + @HeaderParam(HttpHeaders.ACCEPT) String acceptType, @PathParam("cacheId") String cacheId, @DefaultValue("100") @QueryParam("count") int count) { if (LOG.isDebugEnabled()) { @@ -278,11 +281,16 @@ public class QueryResultResource { context.put(sessionIdKey, sessionId); JerseyResourceDelegateContextKey<Long> cacheIdKey = JerseyResourceDelegateContextKey.valueOf(cacheIdKeyName, Long.class); + context.put(cacheIdKey, Long.valueOf(cacheId)); JerseyResourceDelegateContextKey<Integer> countKey = JerseyResourceDelegateContextKey.valueOf(countKeyName, Integer.class); context.put(countKey, count); - + + JerseyResourceDelegateContextKey<String> acceptTypeKey = + JerseyResourceDelegateContextKey.valueOf(acceptTypeKeyName, String.class); + context.put(acceptTypeKey, acceptType); + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( new GetQueryResultSetDelegate(), application, @@ -307,6 +315,9 @@ public class QueryResultResource { JerseyResourceDelegateContextKey<String> queryIdKey = JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class); String queryId = context.get(queryIdKey); + JerseyResourceDelegateContextKey<String> acceptTypeKey = + JerseyResourceDelegateContextKey.valueOf(acceptTypeKeyName, String.class); + String acceptType = context.get(acceptTypeKey); JerseyResourceDelegateContextKey<Long> cacheIdKey = JerseyResourceDelegateContextKey.valueOf(cacheIdKeyName, Long.class); Long cacheId = context.get(cacheIdKey); @@ -345,59 +356,30 @@ public class QueryResultResource { clientApplication.getCachedNonForwardResultScanner(queryIdObj, cacheId.longValue()); try { - int start_offset = cachedQueryResultScanner.getCurrentRowNumber(); - List<ByteString> output = cachedQueryResultScanner.getNextRows(count); - String digestString = getEncodedBase64DigestString(output); - boolean eos = count != output.size(); - - return Response.ok(new QueryResultStreamingOutput(output)) - .header(tajoDigestHeaderName, digestString) - .header(tajoOffsetHeaderName, start_offset) - .header(tajoCountHeaderName, output.size()) - .header(tajoEOSHeaderName, eos) - .build(); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - - return ResourcesUtil.createExceptionResponse(null, e.getMessage()); - } catch (NoSuchAlgorithmException e) { - LOG.error(e.getMessage(), e); - - return ResourcesUtil.createExceptionResponse(null, e.getMessage()); - } - } - - private String getEncodedBase64DigestString(List<ByteString> outputList) throws NoSuchAlgorithmException { - MessageDigest messageDigest = MessageDigest.getInstance("SHA-1"); - - for (ByteString byteString: outputList) { - messageDigest.update(byteString.toByteArray()); - } - - return Base64.encodeBase64String(messageDigest.digest()); - } - } - - private static class QueryResultStreamingOutput implements StreamingOutput { + int startOffset = cachedQueryResultScanner.getCurrentRowNumber(); + AbstractStreamingOutput restOutput = RestOutputFactory.get(acceptType, cachedQueryResultScanner, count, startOffset); + if (restOutput == null) { + return ResourcesUtil.createExceptionResponse(null, acceptType); + } - private final List<ByteString> outputList; + int size = restOutput.count(); + boolean eos = count != size; - public QueryResultStreamingOutput(List<ByteString> outputList) { - this.outputList = outputList; - } + Response.ResponseBuilder builder = Response.ok(restOutput) + .header(tajoOffsetHeaderName, startOffset) + .header(tajoCountHeaderName, size) + .header(tajoEOSHeaderName, eos) + .header(HttpHeaders.CONTENT_TYPE, restOutput.contentType()); - @Override - public void write(OutputStream outputStream) throws IOException, WebApplicationException { - DataOutputStream streamingOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream)); + if (restOutput.hasLength()) { + builder.header(HttpHeaders.CONTENT_LENGTH, restOutput.length()); + } - for (ByteString byteString: outputList) { - byte[] byteStringArray = byteString.toByteArray(); - streamingOutputStream.writeInt(byteStringArray.length); - streamingOutputStream.write(byteStringArray); + return builder.build(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + return ResourcesUtil.createExceptionResponse(null, e.getMessage()); } - - streamingOutputStream.flush(); } } - } http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java new file mode 100644 index 0000000..f1e0eb5 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/AbstractStreamingOutput.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources.outputs; + +import org.apache.tajo.master.exec.NonForwardQueryResultScanner; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.StreamingOutput; + +public abstract class AbstractStreamingOutput implements StreamingOutput { + protected NonForwardQueryResultScanner scanner; + protected int count; + protected int startOffset; + + public AbstractStreamingOutput(NonForwardQueryResultScanner scanner, Integer count, Integer startoffset) { + this.scanner = scanner; + this.count = count; + this.startOffset = startoffset; + } + + public abstract boolean hasLength(); + public abstract int count(); + public abstract int length(); + + public String contentType() { + return MediaType.APPLICATION_OCTET_STREAM; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java new file mode 100644 index 0000000..13afc7b --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/BinaryStreamingOutput.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources.outputs; + +import com.google.protobuf.ByteString; +import org.apache.tajo.master.exec.NonForwardQueryResultScanner; +import org.apache.tajo.ws.rs.annotation.RestReturnType; + +import javax.ws.rs.WebApplicationException; +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +@RestReturnType( + mimeType = "application/octet-stream" +) +public class BinaryStreamingOutput extends AbstractStreamingOutput { + private List<byte[]> byteOutputLists = null; + private int length = -1; + + public BinaryStreamingOutput(NonForwardQueryResultScanner scanner, Integer count, Integer startOffset) throws IOException { + super(scanner, count, startOffset); + } + + @Override + public boolean hasLength() { + return true; + } + + private void fetch() { + if (length == -1) { + length = fill(); + } + } + + @Override + public int length() { + fetch(); + return length; + } + + @Override + public int count() { + try { + fetch(); + return byteOutputLists.size(); + } catch (Exception e) { + return 0; + } + } + + @Override + public void write(OutputStream outputStream) throws IOException, WebApplicationException { + fetch(); + + DataOutputStream streamingOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream)); + + for (byte[] bytes: byteOutputLists) { + streamingOutputStream.writeInt(bytes.length); + streamingOutputStream.write(bytes); + } + + streamingOutputStream.flush(); + } + + private int fill() { + int tmpLen = 0; + try { + byteOutputLists = new ArrayList<byte[]>(); + + List<ByteString> outputList = scanner.getNextRows(count); + for (ByteString byteString : outputList) { + byte[] byteStringArray = byteString.toByteArray(); + byteOutputLists.add(byteStringArray); + tmpLen += 4; + tmpLen += byteStringArray.length; + } + } catch (IOException e) { + } + + return tmpLen; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java new file mode 100644 index 0000000..8b2d3ab --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/CSVStreamingOutput.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources.outputs; + +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.master.exec.NonForwardQueryResultScanner; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.ws.rs.annotation.RestReturnType; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +@RestReturnType( + mimeType = "text/csv" +) +public class CSVStreamingOutput extends AbstractStreamingOutput { + private String output; + private boolean alreadyCalculated = false; + private int size = 0; + + public CSVStreamingOutput(NonForwardQueryResultScanner cachedQueryResultScanner, Integer count, Integer startOffset) throws IOException { + super(cachedQueryResultScanner, count, startOffset); + } + + @Override + public boolean hasLength() { + return true; + } + + @Override + public int length() { + try { + fetch(); + return output.length(); + } catch (Exception e) { + return 0; + } + } + + @Override + public int count() { + try { + fetch(); + return size; + } catch (Exception e) { + return 0; + } + } + + private void fetch() throws IOException { + if (output != null) { + return; + } + + List<Tuple> outputTupletList = scanner.getNextTupleRows(count); + size = outputTupletList.size(); + + StringBuilder sb = new StringBuilder(); + if (startOffset == 0) { + Schema schema = this.scanner.getLogicalSchema(); + List<Column> columns = schema.getAllColumns(); + boolean first = true; + for (Column column : columns) { + if (!first) { + sb.append(","); + } + + sb.append(StringEscapeUtils.escapeCsv(column.getSimpleName())); + first = false; + } + + sb.append("\r\n"); + } + + for (Tuple tuple : outputTupletList) { + Datum[] datums = tuple.getValues(); + int size = datums.length; + + for (int i = 0; i < size; i++) { + if (i != 0) { + sb.append(","); + } + Datum datum = datums[i]; + sb.append(StringEscapeUtils.escapeCsv(datum.toString())); + } + + sb.append("\r\n"); + } + + output = sb.toString(); + } + + @Override + public void write(OutputStream outputStream) throws IOException, WebApplicationException { + fetch(); + DataOutputStream streamingOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream)); + streamingOutputStream.write(output.getBytes("utf-8")); + streamingOutputStream.flush(); + } + + @Override + public String contentType() { + return MediaType.APPLICATION_JSON; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/62cc6f68/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java new file mode 100644 index 0000000..90dd301 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/outputs/RestOutputFactory.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources.outputs; + +import com.google.common.collect.Maps; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.master.exec.NonForwardQueryResultScanner; +import org.apache.tajo.util.ClassUtil; +import org.apache.tajo.ws.rs.annotation.RestReturnType; + +import java.lang.reflect.Modifier; +import java.util.Map; +import java.util.Set; + +public class RestOutputFactory { + private static Log LOG = LogFactory.getLog(RestOutputFactory.class); + private static Map<String, String> restOutputClasses = load(); + + private static Map<String, String> load() { + Map<String, String> outputClasses = Maps.newHashMap(); + Set<Class> restOutputClasses = ClassUtil.findClasses(AbstractStreamingOutput.class, "org.apache.tajo.ws.rs.resources.outputs"); + + for (Class eachClass : restOutputClasses) { + if (eachClass.isInterface() || + Modifier.isAbstract(eachClass.getModifiers())) { + continue; + } + + AbstractStreamingOutput streamingOutput = null; + try { + streamingOutput = (AbstractStreamingOutput) eachClass.getDeclaredConstructor( + new Class[]{NonForwardQueryResultScanner.class, Integer.class, Integer.class}).newInstance(null, 0, 0); + } catch (Exception e) { + LOG.warn(eachClass + " cannot instantiate Function class because of " + e.getMessage(), e); + continue; + } + String className = streamingOutput.getClass().getCanonicalName(); + String headerType = streamingOutput.getClass().getAnnotation(RestReturnType.class).mimeType(); + + if (StringUtils.isNotEmpty(headerType)) { + outputClasses.put(headerType, className); + } + } + + return outputClasses; + } + + public static AbstractStreamingOutput get(String mimeType, NonForwardQueryResultScanner scanner, Integer count, Integer startOffset) { + AbstractStreamingOutput output = null; + try { + if (restOutputClasses.containsKey(mimeType)) { + String className = (String) restOutputClasses.get(mimeType); + Class<?> clazz = Class.forName(className); + output = (AbstractStreamingOutput) clazz.getDeclaredConstructor( + new Class[]{NonForwardQueryResultScanner.class, + Integer.class, Integer.class}) + .newInstance(scanner, count, startOffset); + } else { + output = new CSVStreamingOutput(scanner, count, startOffset); + } + } catch (Exception eh) { + LOG.error(eh); + } + + return output; + } +}
