Repository: tajo Updated Branches: refs/heads/branch-0.11.0 f0e22b76a -> 20c7e184c
http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index a4a916a..f7aff30 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -33,7 +33,6 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.physical.EvalExprExec; @@ -63,11 +62,14 @@ import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.session.Session; import org.apache.tajo.storage.*; +import org.apache.tajo.tuple.memory.MemoryBlock; +import org.apache.tajo.tuple.memory.MemoryRowBlock; import org.apache.tajo.util.ProtoUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -217,21 +219,27 @@ public class QueryExecutor { Schema schema = new Schema(); schema.addColumn("explain", TajoDataTypes.Type.TEXT); - RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder(); - - VTuple tuple = new VTuple(1); + MemoryRowBlock rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema)); String[] lines = explainStr.split("\n"); - int bytesNum = 0; - for (String line : lines) { - tuple.put(0, DatumFactory.createText(line)); - byte [] encodedData = encoder.toBytes(tuple); - bytesNum += encodedData.length; - serializedResBuilder.addSerializedTuples(ByteString.copyFrom(encodedData)); + try { + for (String line : lines) { + rowBlock.getWriter().startRow(); + rowBlock.getWriter().putText(line); + rowBlock.getWriter().endRow(); + } + MemoryBlock memoryBlock = rowBlock.getMemory(); + ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes()); + int uncompressedLength = uncompressed.remaining(); + + serializedResBuilder.setDecompressedLength(uncompressedLength); + serializedResBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed)); + serializedResBuilder.setSchema(schema.getProto()); + serializedResBuilder.setRows(rowBlock.rows()); + } finally { + rowBlock.release(); } - serializedResBuilder.setSchema(schema.getProto()); - serializedResBuilder.setBytesNum(bytesNum); QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query, (LogicalRootNode) plan.getRootBlock().getRoot()); @@ -328,12 +336,23 @@ public class QueryExecutor { insertRowValues(queryContext, insertNode, responseBuilder); } else { Schema schema = PlannerUtil.targetToSchema(targets); - RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); - byte[] serializedBytes = encoder.toBytes(outTuple); SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder(); - serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes)); - serializedResBuilder.setSchema(schema.getProto()); - serializedResBuilder.setBytesNum(serializedBytes.length); + MemoryRowBlock rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema)); + + try { + rowBlock.getWriter().addTuple(outTuple); + + MemoryBlock memoryBlock = rowBlock.getMemory(); + ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes()); + int uncompressedLength = uncompressed.remaining(); + + serializedResBuilder.setDecompressedLength(uncompressedLength); + serializedResBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed)); + serializedResBuilder.setSchema(schema.getProto()); + serializedResBuilder.setRows(rowBlock.rows()); + } finally { + rowBlock.release(); + } QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query, (LogicalRootNode) plan.getRootBlock().getRoot()); http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 1f98457..68916e4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -772,8 +772,8 @@ public class Stage implements EventHandler<StageEvent> { // if store plan (i.e., CREATE or INSERT OVERWRITE) String storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan()); if (storeType == null) { - // get default or store type - storeType = "TEXT"; + // get final output store type (i.e., SELECT) + storeType = channel.getStoreType(); } schema = channel.getSchema(); http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java index 85dbdbe..9e35b3b 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java @@ -18,28 +18,25 @@ package org.apache.tajo.jdbc; -import com.google.protobuf.ServiceException; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.client.CatalogAdminClient; import org.apache.tajo.client.QueryClient; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; -import org.apache.tajo.client.v2.exception.ClientConnectionException; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.error.Errors; +import org.apache.tajo.exception.SQLExceptionUtil; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.error.Errors.ResultCode; import org.apache.tajo.exception.*; import org.apache.tajo.jdbc.util.QueryStringDecoder; import org.apache.tajo.rpc.RpcUtils; import org.apache.tajo.util.KeyValueSet; -import java.io.IOException; -import java.net.ConnectException; import java.net.URI; import java.sql.*; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Properties; @@ -106,6 +103,7 @@ public class JdbcConnection implements Connection { } params = new QueryStringDecoder(rawURI).getParameters(); + properties.putAll(params); } catch (SQLException se) { throw se; } catch (Throwable t) { // for unexpected exceptions like ArrayIndexOutOfBoundsException. @@ -115,7 +113,11 @@ public class JdbcConnection implements Connection { clientProperties = new KeyValueSet(); if(properties != null) { for(Map.Entry<Object, Object> entry: properties.entrySet()) { - clientProperties.set(entry.getKey().toString(), entry.getValue().toString()); + if(entry.getValue() instanceof Collection) { + clientProperties.set(entry.getKey().toString(), StringUtils.join((Collection) entry.getValue(), ",")); + } else { + clientProperties.set(entry.getKey().toString(), entry.getValue().toString()); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java index 0c83fd0..4c926bb 100644 --- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java +++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java @@ -21,23 +21,24 @@ */ package org.apache.tajo.jdbc; -import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.apache.hadoop.fs.Path; import org.apache.tajo.IntegrationTest; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.TpchTestBase; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.TajoProtos.CodecType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.ipc.ClientProtos.SerializedResultSet; import org.apache.tajo.storage.*; +import org.apache.tajo.tuple.memory.MemoryBlock; +import org.apache.tajo.tuple.memory.MemoryRowBlock; +import org.apache.tajo.util.CompressionUtil; import org.apache.tajo.util.KeyValueSet; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -45,9 +46,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.io.IOException; +import java.nio.ByteBuffer; import java.sql.*; import java.util.Calendar; -import java.util.List; import java.util.TimeZone; import static org.junit.Assert.*; @@ -60,7 +61,7 @@ public class TestResultSet { private static FileTablespace sm; private static TableMeta scoreMeta; private static Schema scoreSchema; - private static List<ByteString> serializedData; + private static MemoryRowBlock rowBlock; @BeforeClass public static void setup() throws Exception { @@ -72,13 +73,12 @@ public class TestResultSet { scoreSchema.addColumn("deptname", Type.TEXT); scoreSchema.addColumn("score", Type.INT4); scoreMeta = CatalogUtil.newTableMeta("TEXT"); + rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(scoreSchema)); TableStats stats = new TableStats(); Path p = new Path(sm.getTableUri("default", "score")); sm.getFileSystem().mkdirs(p); Appender appender = sm.getAppender(scoreMeta, scoreSchema, new Path(p, "score")); - RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(scoreSchema); - serializedData = Lists.newArrayList(); appender.init(); int deptSize = 100; @@ -92,7 +92,7 @@ public class TestResultSet { tuple.put(1, DatumFactory.createInt4(i + 1)); written += key.length() + Integer.SIZE; appender.addTuple(tuple); - serializedData.add(ByteString.copyFrom(encoder.toBytes(tuple))); + rowBlock.getWriter().addTuple(tuple); } appender.close(); stats.setNumRows(tupleNum); @@ -103,18 +103,67 @@ public class TestResultSet { desc = new TableDesc(CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score"), scoreSchema, scoreMeta, p.toUri()); desc.setStats(stats); + assertEquals(tupleNum, rowBlock.rows()); } @AfterClass public static void terminate() throws IOException { - + rowBlock.release(); } @Test public void testMemoryResultSet() throws Exception { - TajoMemoryResultSet rs = new TajoMemoryResultSet(null, desc.getSchema(), - serializedData, desc.getStats().getNumRows().intValue(), null); + SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder(); + resultSetBuilder.setSchema(scoreSchema.getProto()); + resultSetBuilder.setRows(rowBlock.rows()); + + MemoryBlock memoryBlock = rowBlock.getMemory(); + ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes()); + resultSetBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed)); + resultSetBuilder.setDecompressedLength(memoryBlock.readableBytes()); + + TajoMemoryResultSet rs = new TajoMemoryResultSet(null, desc.getSchema(), resultSetBuilder.build(), null); + + ResultSetMetaData meta = rs.getMetaData(); + assertNotNull(meta); + Schema schema = scoreSchema; + assertEquals(schema.size(), meta.getColumnCount()); + for (int i = 0; i < meta.getColumnCount(); i++) { + assertEquals(schema.getColumn(i).getSimpleName(), meta.getColumnName(i + 1)); + assertEquals(schema.getColumn(i).getQualifier(), meta.getTableName(i + 1)); + } + + int i = 0; + assertTrue(rs.isBeforeFirst()); + for (; rs.next(); i++) { + assertEquals("test"+i%100, rs.getString(1)); + assertEquals("test"+i%100, rs.getString("deptname")); + assertEquals(i+1, rs.getInt(2)); + assertEquals(i+1, rs.getInt("score")); + } + assertEquals(10000, i); + assertTrue(rs.isAfterLast()); + } + + + @Test + public void testCompressedMemoryResultSet() throws Exception { + SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder(); + resultSetBuilder.setSchema(scoreSchema.getProto()); + resultSetBuilder.setRows(rowBlock.rows()); + + MemoryBlock memoryBlock = rowBlock.getMemory(); + byte[] uncompressedBytes = new byte[memoryBlock.readableBytes()]; + memoryBlock.getBuffer().getBytes(0, uncompressedBytes); + + // compress by snappy + byte[] compressedBytes = CompressionUtil.compress(CodecType.SNAPPY, uncompressedBytes); + resultSetBuilder.setDecompressedLength(uncompressedBytes.length); + resultSetBuilder.setDecompressCodec(CodecType.SNAPPY); + resultSetBuilder.setSerializedTuples(ByteString.copyFrom(compressedBytes)); + + TajoMemoryResultSet rs = new TajoMemoryResultSet(null, desc.getSchema(), resultSetBuilder.build(), null); ResultSetMetaData meta = rs.getMetaData(); assertNotNull(meta); Schema schema = scoreSchema; http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java index 40d7e58..a860d51 100644 --- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java +++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java @@ -194,6 +194,91 @@ public class TestTajoJdbc extends QueryTestCaseBase { } @Test + public void testResultSetCompression() throws Exception { + String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(), + TajoConstants.DEFAULT_DATABASE_NAME); + connUri = connUri + "?" + SessionVars.COMPRESSED_RESULT_TRANSFER.keyname() + "=true"; + Connection conn = DriverManager.getConnection(connUri); + assertTrue(conn.isValid(100)); + + PreparedStatement stmt = null; + ResultSet res = null; + try { + /* + test data set + 1,17.0,N + 1,36.0,N + 2,38.0,N + 3,45.0,R + 3,49.0,R + */ + + String sql = + "select l_orderkey, l_quantity, l_returnflag from lineitem where l_quantity > ? and l_returnflag = ?"; + + stmt = conn.prepareStatement(sql); + + stmt.setInt(1, 20); + stmt.setString(2, "N"); + + res = stmt.executeQuery(); + + ResultSetMetaData rsmd = res.getMetaData(); + assertEquals(3, rsmd.getColumnCount()); + assertEquals("l_orderkey", rsmd.getColumnName(1)); + assertEquals("l_quantity", rsmd.getColumnName(2)); + assertEquals("l_returnflag", rsmd.getColumnName(3)); + + try { + int numRows = 0; + String[] resultData = {"136.0N", "238.0N"}; + while (res.next()) { + assertEquals(resultData[numRows], + ("" + res.getObject(1).toString() + res.getObject(2).toString() + res.getObject(3).toString())); + numRows++; + } + assertEquals(2, numRows); + } finally { + res.close(); + } + + stmt.setInt(1, 20); + stmt.setString(2, "R"); + + res = stmt.executeQuery(); + + rsmd = res.getMetaData(); + assertEquals(3, rsmd.getColumnCount()); + assertEquals("l_orderkey", rsmd.getColumnName(1)); + assertEquals("l_quantity", rsmd.getColumnName(2)); + assertEquals("l_returnflag", rsmd.getColumnName(3)); + + try { + int numRows = 0; + String[] resultData = {"345.0R", "349.0R"}; + while (res.next()) { + assertEquals(resultData[numRows], + ("" + res.getObject(1).toString() + res.getObject(2).toString() + res.getObject(3).toString())); + numRows++; + } + assertEquals(2, numRows); + } finally { + res.close(); + } + } finally { + if (res != null) { + res.close(); + } + if (stmt != null) { + stmt.close(); + } + if (conn != null) { + conn.close(); + } + } + } + + @Test public void testDatabaseMetaDataGetTable() throws Exception { String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(), TajoConstants.DEFAULT_DATABASE_NAME); @@ -584,9 +669,8 @@ public class TestTajoJdbc extends QueryTestCaseBase { try { assertTrue("should have result set", statement.execute()); TajoResultSetBase result = (TajoResultSetBase) statement.getResultSet(); - Thread.sleep(1000); // todo query master is not killed properly if it's compiling the query (use 100, if you want see) statement.cancel(); - + Thread.sleep(1000); QueryStatus status = client.getQueryStatus(result.getQueryId()); assertEquals(TajoProtos.QueryState.QUERY_KILLED, status.getState()); } finally { http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PersistentStoreNode.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PersistentStoreNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PersistentStoreNode.java index c892eda..a28f01f 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PersistentStoreNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PersistentStoreNode.java @@ -21,6 +21,7 @@ package org.apache.tajo.plan.logical; import com.google.gson.annotations.Expose; +import org.apache.tajo.BuiltinStorages; import org.apache.tajo.plan.PlanString; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; @@ -31,7 +32,7 @@ import org.apache.tajo.util.TUtil; * This includes some basic information for materializing data. */ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable { - @Expose protected String storageType = "TEXT"; + @Expose protected String storageType = BuiltinStorages.TEXT; @Expose protected KeyValueSet options = new KeyValueSet(); protected PersistentStoreNode(int pid, NodeType nodeType) { http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-project/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index b6a13af..903976f 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -1153,6 +1153,11 @@ <artifactId>json-path</artifactId> <version>2.0.0</version> </dependency> + <dependency> + <groupId>org.iq80.snappy</groupId> + <artifactId>snappy</artifactId> + <version>0.4</version> + </dependency> </dependencies> </dependencyManagement> <profiles> http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java index aa7ba67..598c8e8 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java @@ -183,11 +183,13 @@ public class RpcClientManager { public static void close() { LOG.info("Closing RPC client manager"); - for (NettyClientBase eachClient : clients.values()) { - try { - eachClient.close(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); + synchronized (clients) { + for (NettyClientBase eachClient : clients.values()) { + try { + eachClient.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } } } }
