HIVE-12049: HiveServer2: Provide an option to write serialized thrift objects in final tasks (Rohit Dholakia reviewed by Ashutosh Chauhan, Gopal Vijayaraghavan, Lefty Leverenz, Vaibhav Gumashta)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fb230f9d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fb230f9d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fb230f9d Branch: refs/heads/master Commit: fb230f9df5b7c990c80326671d9975a6f05e1600 Parents: 145e253 Author: Vaibhav Gumashta <[email protected]> Authored: Fri Apr 22 12:22:55 2016 -0700 Committer: Vaibhav Gumashta <[email protected]> Committed: Fri Apr 22 12:22:55 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hive/beeline/Commands.java | 2 +- .../org/apache/hadoop/hive/conf/HiveConf.java | 11 + .../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 108 ++++- .../org/apache/hive/jdbc/HiveBaseResultSet.java | 2 +- .../apache/hive/jdbc/HiveResultSetMetaData.java | 2 +- .../org/apache/hive/jdbc/HiveStatement.java | 16 +- .../java/org/apache/hive/jdbc/JdbcColumn.java | 2 +- ql/pom.xml | 6 + .../java/org/apache/hadoop/hive/ql/Driver.java | 19 + .../hive/ql/exec/DefaultFetchFormatter.java | 77 ---- .../hadoop/hive/ql/exec/FetchFormatter.java | 71 --- .../apache/hadoop/hive/ql/exec/FetchTask.java | 3 +- .../hadoop/hive/ql/exec/FileSinkOperator.java | 27 +- .../hadoop/hive/ql/exec/ListSinkOperator.java | 11 +- .../org/apache/hadoop/hive/ql/parse/QB.java | 5 + .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 22 +- .../hadoop/hive/ql/parse/TaskCompiler.java | 62 ++- .../apache/hadoop/hive/ql/plan/FetchWork.java | 10 + .../hadoop/hive/ql/plan/FileSinkDesc.java | 9 + .../apache/hadoop/hive/ql/plan/PlanUtils.java | 10 +- serde/pom.xml | 5 + .../hive/serde2/DefaultFetchFormatter.java | 73 +++ .../hadoop/hive/serde2/FetchFormatter.java | 37 ++ .../hadoop/hive/serde2/NoOpFetchFormatter.java | 48 ++ .../apache/hadoop/hive/serde2/SerDeUtils.java | 3 +- .../hadoop/hive/serde2/thrift/ColumnBuffer.java | 439 +++++++++++++++++++ .../hive/serde2/thrift/ThriftFormatter.java | 40 ++ .../serde2/thrift/ThriftJDBCBinarySerDe.java | 178 ++++++++ .../apache/hadoop/hive/serde2/thrift/Type.java | 438 ++++++++++++++++++ .../hadoop/hive/serde2/typeinfo/TypeInfo.java | 14 +- service-rpc/if/TCLIService.thrift | 2 + .../gen/thrift/gen-cpp/TCLIService_types.cpp | 44 ++ .../src/gen/thrift/gen-cpp/TCLIService_types.h | 20 +- .../apache/hive/service/rpc/thrift/TRowSet.java | 222 +++++++++- service-rpc/src/gen/thrift/gen-php/Types.php | 46 ++ .../src/gen/thrift/gen-py/TCLIService/ttypes.py | 28 +- .../gen/thrift/gen-rb/t_c_l_i_service_types.rb | 6 +- .../org/apache/hive/service/cli/Column.java | 434 ------------------ .../apache/hive/service/cli/ColumnBasedSet.java | 84 +++- .../hive/service/cli/ColumnDescriptor.java | 12 +- .../apache/hive/service/cli/ColumnValue.java | 1 + .../apache/hive/service/cli/RowSetFactory.java | 17 +- .../apache/hive/service/cli/TableSchema.java | 4 +- .../java/org/apache/hive/service/cli/Type.java | 348 --------------- .../apache/hive/service/cli/TypeDescriptor.java | 1 + .../cli/operation/GetCatalogsOperation.java | 2 +- .../cli/operation/GetColumnsOperation.java | 4 +- .../cli/operation/GetFunctionsOperation.java | 8 +- .../cli/operation/GetSchemasOperation.java | 5 +- .../cli/operation/GetTableTypesOperation.java | 9 +- .../cli/operation/GetTablesOperation.java | 2 +- .../cli/operation/GetTypeInfoOperation.java | 4 +- .../cli/operation/HiveCommandOperation.java | 4 +- .../service/cli/operation/OperationManager.java | 12 +- .../service/cli/operation/SQLOperation.java | 20 +- .../service/cli/session/HiveSessionImpl.java | 8 +- .../service/cli/thrift/ThriftCLIService.java | 2 +- .../apache/hive/service/cli/CLIServiceTest.java | 2 +- .../org/apache/hive/service/cli/TestColumn.java | 14 +- 59 files changed, 2066 insertions(+), 1049 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/beeline/src/java/org/apache/hive/beeline/Commands.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java index 22e2066..0178333 100644 --- a/beeline/src/java/org/apache/hive/beeline/Commands.java +++ b/beeline/src/java/org/apache/hive/beeline/Commands.java @@ -1179,7 +1179,7 @@ public class Commands { private void showRemainingLogsIfAny(Statement statement) { if (statement instanceof HiveStatement) { HiveStatement hiveStatement = (HiveStatement) statement; - List<String> logs; + List<String> logs = null; do { try { logs = hiveStatement.getQueryLog(); http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5cf1609..c52b9d9 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2037,6 +2037,7 @@ public class HiveConf extends Configuration { new TimeValidator(TimeUnit.SECONDS), "Number of seconds a request will wait to acquire the compile lock before giving up. " + "Setting it to 0s disables the timeout."), + // HiveServer2 WebUI HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"), HIVE_SERVER2_WEBUI_PORT("hive.server2.webui.port", 10002, "The port the HiveServer2 WebUI will listen on. This can be" @@ -2167,6 +2168,7 @@ public class HiveConf extends Configuration { new TimeValidator(TimeUnit.SECONDS), "Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, " + "excessive threads are killed after this time interval."), + // Configuration for async thread pool in SessionManager HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100, "Number of threads in the async thread pool for HiveServer2"), @@ -2330,6 +2332,14 @@ public class HiveConf extends Configuration { HIVE_SERVER2_THRIFT_CLIENT_PASSWORD("hive.server2.thrift.client.password", "anonymous","Password to use against " + "thrift client"), + // ResultSet serialization settings + HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS("hive.server2.thrift.resultset.serialize.in.tasks", false, + "Whether we should serialize the Thrift structures used in JDBC ResultSet RPC in task nodes.\n " + + "We use SequenceFile and ThriftJDBCBinarySerDe to read and write the final results if this is true."), + // TODO: Make use of this config to configure fetch size + HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE("hive.server2.thrift.resultset.max.fetch.size", 1000, + "Max number of rows sent in one Fetch RPC call by the server to the client."), + HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), @@ -3646,6 +3656,7 @@ public class HiveConf extends Configuration { ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES.varname, ConfVars.HIVE_STATS_COLLECT_PART_LEVEL_STATS.varname, ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname, + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS.varname, ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, ConfVars.JOB_DEBUG_CAPTURE_STACKTRACES.varname, ConfVars.JOB_DEBUG_TIMEOUT.varname, http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 10c8ff2..815ccfa 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -431,6 +431,112 @@ public class TestJdbcWithMiniHS2 { } } + private void setSerializeInTasksInConf(HiveConf conf) { + conf.setBoolean("hive.server2.thrift.resultset.serialize.in.tasks", true); + conf.setInt("hive.server2.thrift.resultset.max.fetch.size", 1000); + } + + @Test + public void testMetadataQueriesWithSerializeThriftInTasks() throws Exception { + //stop HiveServer2 + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + + HiveConf conf = new HiveConf(); + String userName; + setSerializeInTasksInConf(conf); + miniHS2 = new MiniHS2(conf); + Map<String, String> confOverlay = new HashMap<String, String>(); + miniHS2.start(confOverlay); + + userName = System.getProperty("user.name"); + hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + Statement stmt = hs2Conn.createStatement(); + stmt.execute("drop table if exists testThriftSerializeShow"); + stmt.execute("create table testThriftSerializeShow (a int)"); + ResultSet rs = stmt.executeQuery("show tables"); + assertTrue(rs.next()); + stmt.execute("describe testThriftSerializeShow"); + stmt.execute("explain select a from testThriftSerializeShow"); + stmt.execute("drop table testThriftSerializeShow"); + stmt.close(); + } + + @Test + public void testSelectThriftSerializeInTasks() throws Exception { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + + HiveConf conf = new HiveConf(); + String userName; + setSerializeInTasksInConf(conf); + miniHS2 = new MiniHS2(conf); + Map<String, String> confOverlay = new HashMap<String, String>(); + miniHS2.start(confOverlay); + + userName = System.getProperty("user.name"); + hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + Statement stmt = hs2Conn.createStatement(); + + stmt.execute("drop table if exists testSelectThriftOrders"); + stmt.execute("drop table if exists testSelectThriftCustomers"); + stmt.execute("create table testSelectThriftOrders (orderid int, orderdate string, customerid int)"); + stmt.execute("create table testSelectThriftCustomers (customerid int, customername string, customercountry string)"); + stmt.execute("insert into testSelectThriftOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)"); + stmt.execute("insert into testSelectThriftCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')"); + ResultSet countOrders = stmt.executeQuery("select count(*) from testSelectThriftOrders"); + while (countOrders.next()) { + assertEquals(3, countOrders.getInt(1)); + } + ResultSet maxOrders = stmt.executeQuery("select max(customerid) from testSelectThriftCustomers"); + while (maxOrders.next()) { + assertEquals(356, maxOrders.getInt(1)); + } + stmt.execute("drop table testSelectThriftOrders"); + stmt.execute("drop table testSelectThriftCustomers"); + stmt.close(); + } + + @Test + public void testJoinThriftSerializeInTasks() throws Exception { + //stop HiveServer2 + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + HiveConf conf = new HiveConf(); + String userName; + + setSerializeInTasksInConf(conf); + + miniHS2 = new MiniHS2(conf); + Map<String, String> confOverlay = new HashMap<String, String>(); + miniHS2.start(confOverlay); + + userName = System.getProperty("user.name"); + hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + Statement stmt = hs2Conn.createStatement(); + stmt.execute("drop table if exists testThriftJoinOrders"); + stmt.execute("drop table if exists testThriftJoinCustomers"); + stmt.execute("create table testThriftJoinOrders (orderid int, orderdate string, customerid int)"); + stmt.execute("create table testThriftJoinCustomers (customerid int, customername string, customercountry string)"); + stmt.execute("insert into testThriftJoinOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)"); + stmt.execute("insert into testThriftJoinCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')"); + ResultSet joinResultSet = stmt.executeQuery("select testThriftJoinOrders.orderid, testThriftJoinCustomers.customername from testThriftJoinOrders inner join testThriftJoinCustomers where testThriftJoinOrders.customerid=testThriftJoinCustomers.customerid"); + Map<Integer, String> expectedResult = new HashMap<Integer, String>(); + expectedResult.put(1, "David"); + expectedResult.put(2, "John"); + expectedResult.put(3, "Mary"); + for (int i = 1; i < 4; i++) { + assertTrue(joinResultSet.next()); + assertEquals(joinResultSet.getString(2), expectedResult.get(i)); + } + stmt.execute("drop table testThriftJoinOrders"); + stmt.execute("drop table testThriftJoinCustomers"); + stmt.close(); + } + /** * Tests the creation of the 3 scratch dirs: hdfs, local, downloaded resources (which is also local). * 1. Test with doAs=false: open a new JDBC session and verify the presence of directories/permissions @@ -810,4 +916,4 @@ public class TestJdbcWithMiniHS2 { } return -1; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java index 88ba853..93f093f 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java @@ -46,8 +46,8 @@ import java.util.Map; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; +import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.cli.TableSchema; -import org.apache.hive.service.cli.Type; /** * Data independent base class which implements the common part of http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java b/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java index 16a0894..f6c38d8 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java @@ -22,7 +22,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.List; -import org.apache.hive.service.cli.Type; +import org.apache.hadoop.hive.serde2.thrift.Type; /** * HiveResultSetMetaData. http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index f5b9672..3cc6b74 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -35,6 +35,7 @@ import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq; import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp; import org.apache.hive.service.rpc.thrift.TOperationHandle; import org.apache.hive.service.rpc.thrift.TSessionHandle; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -881,15 +882,22 @@ public class HiveStatement implements java.sql.Statement { } } catch (SQLException e) { throw e; + } catch (TException e) { + throw new SQLException("Error when getting query log: " + e, e); } catch (Exception e) { throw new SQLException("Error when getting query log: " + e, e); } - RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), - connection.getProtocol()); - for (Object[] row : rowSet) { - logs.add(String.valueOf(row[0])); + try { + RowSet rowSet; + rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), connection.getProtocol()); + for (Object[] row : rowSet) { + logs.add(String.valueOf(row[0])); + } + } catch (TException e) { + throw new SQLException("Error building result set for query log: " + e, e); } + return logs; } http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java index 691fd0e..5aed679 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java +++ b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java @@ -27,7 +27,7 @@ import java.sql.Types; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hive.service.cli.Type; +import org.apache.hadoop.hive.serde2.thrift.Type; /** http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/pom.xml ---------------------------------------------------------------------- diff --git a/ql/pom.xml b/ql/pom.xml index ebb9599..aaa3271 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -70,6 +70,11 @@ </dependency> <dependency> <groupId>org.apache.hive</groupId> + <artifactId>hive-service-rpc</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> <artifactId>hive-llap-client</artifactId> <version>${project.version}</version> </dependency> @@ -803,6 +808,7 @@ <include>org.apache.hive:hive-serde</include> <include>org.apache.hive:hive-llap-client</include> <include>org.apache.hive:hive-metastore</include> + <include>org.apache.hive:hive-service-rpc</include> <include>com.esotericsoftware:kryo-shaded</include> <include>com.esotericsoftware:minlog</include> <include>org.objenesis:objenesis</include> http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 65744ac..48fb060 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -108,6 +108,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; @@ -932,6 +933,13 @@ public class Driver implements CommandProcessor { return plan; } + /** + * @return The current FetchTask associated with the Driver's plan, if any. + */ + public FetchTask getFetchTask() { + return fetchTask; + } + // Write the current set of valid transactions into the conf file so that it can be read by // the input format. private void recordValidTxns() throws LockException { @@ -1880,6 +1888,17 @@ public class Driver implements CommandProcessor { throw new IOException("FAILED: Operation cancelled"); } if (isFetchingTable()) { + /** + * If resultset serialization to thrift object is enabled, and if the destination table is + * indeed written using ThriftJDBCBinarySerDe, read one row from the output sequence file, + * since it is a blob of row batches. + */ + if (fetchTask.getWork().isHiveServerQuery() && HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS) + && (fetchTask.getTblDesc().getSerdeClassName() + .equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()))) { + maxRows = 1; + } fetchTask.setMaxRows(maxRows); return fetchTask.fetch(res); } http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java deleted file mode 100644 index b8be3a5..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec; - -import java.io.IOException; -import java.util.Properties; - -import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; -import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hive.common.util.ReflectionUtil; - -/** - * serialize row by user specified serde and call toString() to make string type result - */ -public class DefaultFetchFormatter<T> implements FetchFormatter<String> { - - private SerDe mSerde; - - @Override - public void initialize(Configuration hconf, Properties props) throws HiveException { - try { - mSerde = initializeSerde(hconf, props); - } catch (Exception e) { - throw new HiveException(e); - } - } - - private SerDe initializeSerde(Configuration conf, Properties props) throws Exception { - String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE); - Class<? extends SerDe> serdeClass = Class.forName(serdeName, true, - Utilities.getSessionSpecifiedClassLoader()).asSubclass(SerDe.class); - // cast only needed for Hadoop 0.17 compatibility - SerDe serde = ReflectionUtil.newInstance(serdeClass, null); - - Properties serdeProps = new Properties(); - if (serde instanceof DelimitedJSONSerDe) { - serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT)); - serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT)); - } - SerDeUtils.initializeSerDe(serde, conf, serdeProps, null); - return serde; - } - - @Override - public String convert(Object row, ObjectInspector rowOI) throws Exception { - return mSerde.serialize(row, rowOI).toString(); - } - - @Override - public void close() throws IOException { - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java deleted file mode 100644 index c2ed0d6..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; - -/** - * internal-use only - * - * Used in ListSinkOperator for formatting final output - */ -public interface FetchFormatter<T> extends Closeable { - - void initialize(Configuration hconf, Properties props) throws Exception; - - T convert(Object row, ObjectInspector rowOI) throws Exception; - - public static class ThriftFormatter implements FetchFormatter<Object> { - - int protocol; - - @Override - public void initialize(Configuration hconf, Properties props) throws Exception { - protocol = hconf.getInt(ListSinkOperator.OUTPUT_PROTOCOL, 0); - } - - @Override - public Object convert(Object row, ObjectInspector rowOI) throws Exception { - StructObjectInspector structOI = (StructObjectInspector) rowOI; - List<? extends StructField> fields = structOI.getAllStructFieldRefs(); - - Object[] converted = new Object[fields.size()]; - for (int i = 0 ; i < converted.length; i++) { - StructField fieldRef = fields.get(i); - Object field = structOI.getStructFieldData(row, fieldRef); - converted[i] = field == null ? null : - SerDeUtils.toThriftPayload(field, fieldRef.getFieldObjectInspector(), protocol); - } - return converted; - } - - @Override - public void close() throws IOException { - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 0b0c336..b96ea04 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -48,12 +48,10 @@ import org.apache.hadoop.util.StringUtils; **/ public class FetchTask extends Task<FetchWork> implements Serializable { private static final long serialVersionUID = 1L; - private int maxRows = 100; private FetchOperator fetch; private ListSinkOperator sink; private int totalRows; - private static transient final Logger LOG = LoggerFactory.getLogger(FetchTask.class); public FetchTask() { @@ -186,4 +184,5 @@ public class FetchTask extends Task<FetchWork> implements Serializable { fetch.clearFetchContext(); } } + } http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index ec6381b..3ec63ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; import org.apache.hadoop.hive.shims.ShimLoader; @@ -122,7 +123,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected transient long numRows = 0; protected transient long cntr = 1; protected transient long logEveryNRows = 0; - + protected transient int rowIndex = 0; /** * Counters. */ @@ -374,7 +375,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements // half of the script.timeout but less than script.timeout, we will still // be able to report progress. timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000) / 2; - if (hconf instanceof JobConf) { jc = (JobConf) hconf; } else { @@ -656,6 +656,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected Writable recordValue; + @Override public void process(Object row, int tag) throws HiveException { /* Create list bucketing sub-directory only if stored-as-directories is on. */ @@ -717,8 +718,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } else { fpaths = fsp; } - // use SerDe to serialize r, and write it out recordValue = serializer.serialize(row, inputObjInspectors[0]); + // if serializer is ThriftJDBCBinarySerDe, then recordValue is null if the buffer is not full (the size of buffer + // is kept track of in the SerDe) + if (recordValue == null) { + return; + } } rowOutWriters = fpaths.outWriters; @@ -745,6 +750,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements LOG.info(toString() + ": records written - " + numRows); } + // This should always be 0 for the final result file int writerOffset = findWriterOffset(row); // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same // for a given operator branch prediction should work quite nicely on it. @@ -1012,9 +1018,22 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements lastProgressReport = System.currentTimeMillis(); if (!abort) { + // If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a certain limit (hive.server2.thrift.resultset.max.fetch.size) + // and serializes the whole batch when the buffer is full. The serialize returns null if the buffer is not full + // (the size of buffer is kept track of in the ThriftJDBCBinarySerDe). + if (conf.isHiveServerQuery() && HiveConf.getBoolVar(hconf, + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS) && + serializer.getClass().getName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) { + try { + recordValue = serializer.serialize(null, inputObjInspectors[0]); + rowOutWriters = fpaths.outWriters; + rowOutWriters[0].write(recordValue); + } catch (SerDeException | IOException e) { + throw new HiveException(e); + } + } for (FSPaths fsp : valToPaths.values()) { fsp.closeWriters(abort); - // before closing the operator check if statistics gathering is requested // and is provided by record writer. this is different from the statistics // gathering done in processOp(). In processOp(), for each row added http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java index b081cd0..9bf363c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java @@ -27,6 +27,9 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ListSinkDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.DefaultFetchFormatter; +import org.apache.hadoop.hive.serde2.FetchFormatter; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.util.ReflectionUtils; /** @@ -34,10 +37,6 @@ import org.apache.hadoop.util.ReflectionUtils; * and finally arrives to this operator. */ public class ListSinkOperator extends Operator<ListSinkDesc> { - - public static final String OUTPUT_FORMATTER = "output.formatter"; - public static final String OUTPUT_PROTOCOL = "output.protocol"; - private transient List res; private transient FetchFormatter fetcher; private transient int numRows; @@ -62,7 +61,7 @@ public class ListSinkOperator extends Operator<ListSinkDesc> { } private FetchFormatter initializeFetcher(Configuration conf) throws Exception { - String formatterName = conf.get(OUTPUT_FORMATTER); + String formatterName = conf.get(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER); FetchFormatter fetcher; if (formatterName != null && !formatterName.isEmpty()) { Class<? extends FetchFormatter> fetcherClass = Class.forName(formatterName, true, @@ -71,12 +70,10 @@ public class ListSinkOperator extends Operator<ListSinkDesc> { } else { fetcher = new DefaultFetchFormatter(); } - // selectively used by fetch formatter Properties props = new Properties(); props.put(serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.tabCode); props.put(serdeConstants.SERIALIZATION_NULL_FORMAT, getConf().getSerializationNullFormat()); - fetcher.initialize(conf, props); return fetcher; } http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java index cf3bbf0..de7b151 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java @@ -263,6 +263,11 @@ public class QB { this.isQuery = isQuery; } + /** + * Set to true in SemanticAnalyzer.getMetadataForDestFile, + * if destination is a file and query is not CTAS + * @return + */ public boolean getIsQuery() { return isQuery; } http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 96df189..005b53f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -203,6 +203,7 @@ import org.apache.hadoop.hive.ql.util.ResourceDownloader; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; +import org.apache.hadoop.hive.serde2.NoOpFetchFormatter; import org.apache.hadoop.hive.serde2.NullStructSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -214,6 +215,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -6834,8 +6836,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (tblDesc == null) { if (qb.getIsQuery()) { - String fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat); + String fileFormat; + if (SessionState.get().isHiveServerQuery() && + conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) { + fileFormat = "SequenceFile"; + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat); + table_desc= + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, + ThriftJDBCBinarySerDe.class); + // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll + // write out formatted thrift objects to SequenceFile + conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName()); + } else { + fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); + table_desc = + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, + LazySimpleSerDe.class); + } } else { table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes); } @@ -6907,6 +6924,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { dpCtx, dest_path); + fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery()); // If this is an insert, update, or delete on an ACID table then mark that so the // FileSinkOperator knows how to properly write to it. if (destTableIsAcid) { http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index f7d7a40..75ca5f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -58,7 +58,15 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.DefaultFetchFormatter; +import org.apache.hadoop.hive.serde2.NoOpFetchFormatter; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; +import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import com.google.common.collect.Interner; import com.google.common.collect.Interners; @@ -97,6 +105,20 @@ public abstract class TaskCompiler { int outerQueryLimit = pCtx.getQueryProperties().getOuterQueryLimit(); if (pCtx.getFetchTask() != null) { + if (pCtx.getFetchTask().getTblDesc() == null) { + return; + } + pCtx.getFetchTask().getWork().setHiveServerQuery(SessionState.get().isHiveServerQuery()); + TableDesc resultTab = pCtx.getFetchTask().getTblDesc(); + // If the serializer is ThriftJDBCBinarySerDe, then it requires that NoOpFetchFormatter be used. But when it isn't, + // then either the ThriftFormatter or the DefaultFetchFormatter should be used. + if (!resultTab.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) { + if (SessionState.get().isHiveServerQuery()) { + conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER,ThriftFormatter.class.getName()); + } else { + conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, DefaultFetchFormatter.class.getName()); + } + } return; } @@ -117,13 +139,34 @@ public abstract class TaskCompiler { String cols = loadFileDesc.getColumns(); String colTypes = loadFileDesc.getColumnTypes(); + String resFileFormat; TableDesc resultTab = pCtx.getFetchTableDesc(); if (resultTab == null) { - String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); + resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); + if (SessionState.get().isHiveServerQuery() && (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) + && (resFileFormat.equalsIgnoreCase("SequenceFile"))) { + resultTab = + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat, + ThriftJDBCBinarySerDe.class); + // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll + // read formatted thrift objects from the output SequenceFile written by Tasks. + conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName()); + } else { + resultTab = + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat, + LazySimpleSerDe.class); + } + } else { + if (resultTab.getProperties().getProperty(serdeConstants.SERIALIZATION_LIB) + .equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) { + // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll + // read formatted thrift objects from the output SequenceFile written by Tasks. + conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName()); + } } FetchWork fetch = new FetchWork(loadFileDesc.getSourcePath(), resultTab, outerQueryLimit); + fetch.setHiveServerQuery(SessionState.get().isHiveServerQuery()); fetch.setSource(pCtx.getFetchSource()); fetch.setSink(pCtx.getFetchSink()); @@ -322,8 +365,19 @@ public abstract class TaskCompiler { String cols = loadFileWork.get(0).getColumns(); String colTypes = loadFileWork.get(0).getColumnTypes(); - String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); + String resFileFormat; + TableDesc resultTab; + if (SessionState.get().isHiveServerQuery() && conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) { + resFileFormat = "SequenceFile"; + resultTab = + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat, + ThriftJDBCBinarySerDe.class); + } else { + resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); + resultTab = + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat, + LazySimpleSerDe.class); + } fetch = new FetchWork(loadFileWork.get(0).getSourcePath(), resultTab, outerQueryLimit); http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java index d68c64c..8ea6440 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java @@ -63,6 +63,16 @@ public class FetchWork implements Serializable { */ private String serializationNullFormat = "NULL"; + private boolean isHiveServerQuery; + + public boolean isHiveServerQuery() { + return isHiveServerQuery; + } + + public void setHiveServerQuery(boolean isHiveServerQuery) { + this.isHiveServerQuery = isHiveServerQuery; + } + public FetchWork() { } http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 07fd2dc..0064fca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -97,6 +97,7 @@ public class FileSinkDesc extends AbstractOperatorDesc { private transient Table table; private Path destPath; + private boolean isHiveServerQuery; public FileSinkDesc() { } @@ -160,6 +161,14 @@ public class FileSinkDesc extends AbstractOperatorDesc { return ret; } + public boolean isHiveServerQuery() { + return this.isHiveServerQuery; + } + + public void setHiveServerQuery(boolean isHiveServerQuery) { + this.isHiveServerQuery = isHiveServerQuery; + } + @Explain(displayName = "directory", explainLevels = { Level.EXTENDED }) public Path getDirName() { return dirName; http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 2992568..c39a46f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -280,13 +280,13 @@ public final class PlanUtils { } public static TableDesc getDefaultQueryOutputTableDesc(String cols, String colTypes, - String fileFormat) { - TableDesc tblDesc = getTableDesc(LazySimpleSerDe.class, "" + Utilities.ctrlaCode, cols, colTypes, - false, false, fileFormat); - //enable escaping + String fileFormat, Class<? extends Deserializer> serdeClass) { + TableDesc tblDesc = + getTableDesc(serdeClass, "" + Utilities.ctrlaCode, cols, colTypes, false, false, fileFormat); + // enable escaping tblDesc.getProperties().setProperty(serdeConstants.ESCAPE_CHAR, "\\"); tblDesc.getProperties().setProperty(serdeConstants.SERIALIZATION_ESCAPE_CRLF, "true"); - //enable extended nesting levels + // enable extended nesting levels tblDesc.getProperties().setProperty( LazySerDeParameters.SERIALIZATION_EXTEND_ADDITIONAL_NESTING_LEVELS, "true"); return tblDesc; http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/pom.xml ---------------------------------------------------------------------- diff --git a/serde/pom.xml b/serde/pom.xml index cea7fce..9f50764 100644 --- a/serde/pom.xml +++ b/serde/pom.xml @@ -41,6 +41,11 @@ </dependency> <dependency> <groupId>org.apache.hive</groupId> + <artifactId>hive-service-rpc</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> <artifactId>hive-shims</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java new file mode 100644 index 0000000..3038037 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import java.io.IOException; +import java.util.Properties; + +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hive.common.util.ReflectionUtil; + +/** + * serialize row by user specified serde and call toString() to make string type result + */ +public class DefaultFetchFormatter<T> implements FetchFormatter<String> { + + private SerDe mSerde; + + @Override + public void initialize(Configuration hconf, Properties props) throws SerDeException { + mSerde = initializeSerde(hconf, props); + } + + private SerDe initializeSerde(Configuration conf, Properties props) throws SerDeException { + String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE); + Class<? extends SerDe> serdeClass; + try { + serdeClass = + Class.forName(serdeName, true, JavaUtils.getClassLoader()).asSubclass(SerDe.class); + } catch (ClassNotFoundException e) { + throw new SerDeException(e); + } + // cast only needed for Hadoop 0.17 compatibility + SerDe serde = ReflectionUtil.newInstance(serdeClass, null); + Properties serdeProps = new Properties(); + if (serde instanceof DelimitedJSONSerDe) { + serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT)); + serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT)); + } + SerDeUtils.initializeSerDe(serde, conf, serdeProps, null); + return serde; + } + + @Override + public String convert(Object row, ObjectInspector rowOI) throws Exception { + return mSerde.serialize(row, rowOI).toString(); + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java new file mode 100644 index 0000000..5cc93aa --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import java.io.Closeable; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * internal-use only + * + * Used in ListSinkOperator for formatting final output + */ +public interface FetchFormatter<T> extends Closeable { + + void initialize(Configuration hconf, Properties props) throws Exception; + + T convert(Object row, ObjectInspector rowOI) throws Exception; +} http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java new file mode 100644 index 0000000..91929f1 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * A No-op fetch formatter. + * ListSinkOperator uses this when reading from the destination table which has data serialized by + * ThriftJDBCBinarySerDe to a SequenceFile. + */ +public class NoOpFetchFormatter<T> implements FetchFormatter<Object> { + + @Override + public void initialize(Configuration hconf, Properties props) throws SerDeException { + } + + // this returns the row as is because this formatter is only called when + // the ThriftJDBCBinarySerDe was used to serialize the rows to thrift-able objects. + @Override + public Object convert(Object row, ObjectInspector rowOI) throws Exception { + return new Object[] { row }; + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java index 90439a2..6e08dfd 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java @@ -71,7 +71,8 @@ public final class SerDeUtils { // lower case null is used within json objects private static final String JSON_NULL = "null"; - + public static final String LIST_SINK_OUTPUT_FORMATTER = "list.sink.output.formatter"; + public static final String LIST_SINK_OUTPUT_PROTOCOL = "list.sink.output.protocol"; public static final Logger LOG = LoggerFactory.getLogger(SerDeUtils.class.getName()); /** http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java new file mode 100644 index 0000000..929c405 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java @@ -0,0 +1,439 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.thrift; + +import java.nio.ByteBuffer; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; + +import org.apache.hive.service.rpc.thrift.TBinaryColumn; +import org.apache.hive.service.rpc.thrift.TBoolColumn; +import org.apache.hive.service.rpc.thrift.TByteColumn; +import org.apache.hive.service.rpc.thrift.TColumn; +import org.apache.hive.service.rpc.thrift.TDoubleColumn; +import org.apache.hive.service.rpc.thrift.TI16Column; +import org.apache.hive.service.rpc.thrift.TI32Column; +import org.apache.hive.service.rpc.thrift.TI64Column; +import org.apache.hive.service.rpc.thrift.TStringColumn; + +import com.google.common.primitives.Booleans; +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.google.common.primitives.Shorts; + +/** + * ColumnBuffer + */ +public class ColumnBuffer extends AbstractList { + + private static final int DEFAULT_SIZE = 100; + + private final Type type; + + private BitSet nulls; + + private int size; + private boolean[] boolVars; + private byte[] byteVars; + private short[] shortVars; + private int[] intVars; + private long[] longVars; + private double[] doubleVars; + private List<String> stringVars; + private List<ByteBuffer> binaryVars; + + public ColumnBuffer(Type type, BitSet nulls, Object values) { + this.type = type; + this.nulls = nulls; + if (type == Type.BOOLEAN_TYPE) { + boolVars = (boolean[]) values; + size = boolVars.length; + } else if (type == Type.TINYINT_TYPE) { + byteVars = (byte[]) values; + size = byteVars.length; + } else if (type == Type.SMALLINT_TYPE) { + shortVars = (short[]) values; + size = shortVars.length; + } else if (type == Type.INT_TYPE) { + intVars = (int[]) values; + size = intVars.length; + } else if (type == Type.BIGINT_TYPE) { + longVars = (long[]) values; + size = longVars.length; + } else if (type == Type.DOUBLE_TYPE) { + doubleVars = (double[]) values; + size = doubleVars.length; + } else if (type == Type.BINARY_TYPE) { + binaryVars = (List<ByteBuffer>) values; + size = binaryVars.size(); + } else if (type == Type.STRING_TYPE) { + stringVars = (List<String>) values; + size = stringVars.size(); + } else { + throw new IllegalStateException("invalid union object"); + } + } + + public ColumnBuffer(Type type) { + nulls = new BitSet(); + switch (type) { + case BOOLEAN_TYPE: + boolVars = new boolean[DEFAULT_SIZE]; + break; + case TINYINT_TYPE: + byteVars = new byte[DEFAULT_SIZE]; + break; + case SMALLINT_TYPE: + shortVars = new short[DEFAULT_SIZE]; + break; + case INT_TYPE: + intVars = new int[DEFAULT_SIZE]; + break; + case BIGINT_TYPE: + longVars = new long[DEFAULT_SIZE]; + break; + case FLOAT_TYPE: + case DOUBLE_TYPE: + type = Type.DOUBLE_TYPE; + doubleVars = new double[DEFAULT_SIZE]; + break; + case BINARY_TYPE: + binaryVars = new ArrayList<ByteBuffer>(); + break; + default: + type = Type.STRING_TYPE; + stringVars = new ArrayList<String>(); + } + this.type = type; + } + + public ColumnBuffer(TColumn colValues) { + if (colValues.isSetBoolVal()) { + type = Type.BOOLEAN_TYPE; + nulls = toBitset(colValues.getBoolVal().getNulls()); + boolVars = Booleans.toArray(colValues.getBoolVal().getValues()); + size = boolVars.length; + } else if (colValues.isSetByteVal()) { + type = Type.TINYINT_TYPE; + nulls = toBitset(colValues.getByteVal().getNulls()); + byteVars = Bytes.toArray(colValues.getByteVal().getValues()); + size = byteVars.length; + } else if (colValues.isSetI16Val()) { + type = Type.SMALLINT_TYPE; + nulls = toBitset(colValues.getI16Val().getNulls()); + shortVars = Shorts.toArray(colValues.getI16Val().getValues()); + size = shortVars.length; + } else if (colValues.isSetI32Val()) { + type = Type.INT_TYPE; + nulls = toBitset(colValues.getI32Val().getNulls()); + intVars = Ints.toArray(colValues.getI32Val().getValues()); + size = intVars.length; + } else if (colValues.isSetI64Val()) { + type = Type.BIGINT_TYPE; + nulls = toBitset(colValues.getI64Val().getNulls()); + longVars = Longs.toArray(colValues.getI64Val().getValues()); + size = longVars.length; + } else if (colValues.isSetDoubleVal()) { + type = Type.DOUBLE_TYPE; + nulls = toBitset(colValues.getDoubleVal().getNulls()); + doubleVars = Doubles.toArray(colValues.getDoubleVal().getValues()); + size = doubleVars.length; + } else if (colValues.isSetBinaryVal()) { + type = Type.BINARY_TYPE; + nulls = toBitset(colValues.getBinaryVal().getNulls()); + binaryVars = colValues.getBinaryVal().getValues(); + size = binaryVars.size(); + } else if (colValues.isSetStringVal()) { + type = Type.STRING_TYPE; + nulls = toBitset(colValues.getStringVal().getNulls()); + stringVars = colValues.getStringVal().getValues(); + size = stringVars.size(); + } else { + throw new IllegalStateException("invalid union object"); + } + } + + public ColumnBuffer extractSubset(int start, int end) { + BitSet subNulls = nulls.get(start, end); + if (type == Type.BOOLEAN_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(boolVars, start, end)); + boolVars = Arrays.copyOfRange(boolVars, end, size); + nulls = nulls.get(start, size); + size = boolVars.length; + return subset; + } + if (type == Type.TINYINT_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(byteVars, start, end)); + byteVars = Arrays.copyOfRange(byteVars, end, size); + nulls = nulls.get(start, size); + size = byteVars.length; + return subset; + } + if (type == Type.SMALLINT_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(shortVars, start, end)); + shortVars = Arrays.copyOfRange(shortVars, end, size); + nulls = nulls.get(start, size); + size = shortVars.length; + return subset; + } + if (type == Type.INT_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(intVars, start, end)); + intVars = Arrays.copyOfRange(intVars, end, size); + nulls = nulls.get(start, size); + size = intVars.length; + return subset; + } + if (type == Type.BIGINT_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(longVars, start, end)); + longVars = Arrays.copyOfRange(longVars, end, size); + nulls = nulls.get(start, size); + size = longVars.length; + return subset; + } + if (type == Type.DOUBLE_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(doubleVars, start, end)); + doubleVars = Arrays.copyOfRange(doubleVars, end, size); + nulls = nulls.get(start, size); + size = doubleVars.length; + return subset; + } + if (type == Type.BINARY_TYPE) { + ColumnBuffer subset = new ColumnBuffer(type, subNulls, binaryVars.subList(start, end)); + binaryVars = binaryVars.subList(end, binaryVars.size()); + nulls = nulls.get(start, size); + size = binaryVars.size(); + return subset; + } + if (type == Type.STRING_TYPE) { + ColumnBuffer subset = new ColumnBuffer(type, subNulls, stringVars.subList(start, end)); + stringVars = stringVars.subList(end, stringVars.size()); + nulls = nulls.get(start, size); + size = stringVars.size(); + return subset; + } + throw new IllegalStateException("invalid union object"); + } + + private static final byte[] MASKS = new byte[] { + 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, (byte)0x80 + }; + + private static BitSet toBitset(byte[] nulls) { + BitSet bitset = new BitSet(); + int bits = nulls.length * 8; + for (int i = 0; i < bits; i++) { + bitset.set(i, (nulls[i / 8] & MASKS[i % 8]) != 0); + } + return bitset; + } + + private static byte[] toBinary(BitSet bitset) { + byte[] nulls = new byte[1 + (bitset.length() / 8)]; + for (int i = 0; i < bitset.length(); i++) { + nulls[i / 8] |= bitset.get(i) ? MASKS[i % 8] : 0; + } + return nulls; + } + + public Type getType() { + return type; + } + + @Override + public Object get(int index) { + if (nulls.get(index)) { + return null; + } + switch (type) { + case BOOLEAN_TYPE: + return boolVars[index]; + case TINYINT_TYPE: + return byteVars[index]; + case SMALLINT_TYPE: + return shortVars[index]; + case INT_TYPE: + return intVars[index]; + case BIGINT_TYPE: + return longVars[index]; + case DOUBLE_TYPE: + return doubleVars[index]; + case STRING_TYPE: + return stringVars.get(index); + case BINARY_TYPE: + return binaryVars.get(index).array(); + } + return null; + } + + @Override + public int size() { + return size; + } + + public TColumn toTColumn() { + TColumn value = new TColumn(); + ByteBuffer nullMasks = ByteBuffer.wrap(toBinary(nulls)); + switch (type) { + case BOOLEAN_TYPE: + value.setBoolVal(new TBoolColumn(Booleans.asList(Arrays.copyOfRange(boolVars, 0, size)), + nullMasks)); + break; + case TINYINT_TYPE: + value.setByteVal(new TByteColumn(Bytes.asList(Arrays.copyOfRange(byteVars, 0, size)), + nullMasks)); + break; + case SMALLINT_TYPE: + value.setI16Val(new TI16Column(Shorts.asList(Arrays.copyOfRange(shortVars, 0, size)), + nullMasks)); + break; + case INT_TYPE: + value.setI32Val(new TI32Column(Ints.asList(Arrays.copyOfRange(intVars, 0, size)), nullMasks)); + break; + case BIGINT_TYPE: + value + .setI64Val(new TI64Column(Longs.asList(Arrays.copyOfRange(longVars, 0, size)), nullMasks)); + break; + case DOUBLE_TYPE: + value.setDoubleVal(new TDoubleColumn(Doubles.asList(Arrays.copyOfRange(doubleVars, 0, size)), + nullMasks)); + break; + case STRING_TYPE: + value.setStringVal(new TStringColumn(stringVars, nullMasks)); + break; + case BINARY_TYPE: + value.setBinaryVal(new TBinaryColumn(binaryVars, nullMasks)); + break; + } + return value; + } + + private static final ByteBuffer EMPTY_BINARY = ByteBuffer.allocate(0); + private static final String EMPTY_STRING = ""; + + public void addValue(Object field) { + addValue(this.type, field); + } + + public void addValue(Type type, Object field) { + switch (type) { + case BOOLEAN_TYPE: + nulls.set(size, field == null); + boolVars()[size] = field == null ? true : (Boolean) field; + break; + case TINYINT_TYPE: + nulls.set(size, field == null); + byteVars()[size] = field == null ? 0 : (Byte) field; + break; + case SMALLINT_TYPE: + nulls.set(size, field == null); + shortVars()[size] = field == null ? 0 : (Short) field; + break; + case INT_TYPE: + nulls.set(size, field == null); + intVars()[size] = field == null ? 0 : (Integer) field; + break; + case BIGINT_TYPE: + nulls.set(size, field == null); + longVars()[size] = field == null ? 0 : (Long) field; + break; + case FLOAT_TYPE: + nulls.set(size, field == null); + doubleVars()[size] = field == null ? 0 : new Double(field.toString()); + break; + case DOUBLE_TYPE: + nulls.set(size, field == null); + doubleVars()[size] = field == null ? 0 : (Double) field; + break; + case BINARY_TYPE: + nulls.set(binaryVars.size(), field == null); + binaryVars.add(field == null ? EMPTY_BINARY : ByteBuffer.wrap((byte[]) field)); + break; + default: + nulls.set(stringVars.size(), field == null); + stringVars.add(field == null ? EMPTY_STRING : String.valueOf(field)); + break; + } + size++; + } + + private boolean[] boolVars() { + if (boolVars.length == size) { + boolean[] newVars = new boolean[size << 1]; + System.arraycopy(boolVars, 0, newVars, 0, size); + return boolVars = newVars; + } + return boolVars; + } + + private byte[] byteVars() { + if (byteVars.length == size) { + byte[] newVars = new byte[size << 1]; + System.arraycopy(byteVars, 0, newVars, 0, size); + return byteVars = newVars; + } + return byteVars; + } + + private short[] shortVars() { + if (shortVars.length == size) { + short[] newVars = new short[size << 1]; + System.arraycopy(shortVars, 0, newVars, 0, size); + return shortVars = newVars; + } + return shortVars; + } + + private int[] intVars() { + if (intVars.length == size) { + int[] newVars = new int[size << 1]; + System.arraycopy(intVars, 0, newVars, 0, size); + return intVars = newVars; + } + return intVars; + } + + private long[] longVars() { + if (longVars.length == size) { + long[] newVars = new long[size << 1]; + System.arraycopy(longVars, 0, newVars, 0, size); + return longVars = newVars; + } + return longVars; + } + + private double[] doubleVars() { + if (doubleVars.length == size) { + double[] newVars = new double[size << 1]; + System.arraycopy(doubleVars, 0, newVars, 0, size); + return doubleVars = newVars; + } + return doubleVars; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java new file mode 100644 index 0000000..a4c120e --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java @@ -0,0 +1,40 @@ +package org.apache.hadoop.hive.serde2.thrift; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.FetchFormatter; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +public class ThriftFormatter implements FetchFormatter<Object> { + + int protocol; + + @Override + public void initialize(Configuration hconf, Properties props) throws Exception { + protocol = hconf.getInt(SerDeUtils.LIST_SINK_OUTPUT_PROTOCOL, 0); + } + + @Override + public Object convert(Object row, ObjectInspector rowOI) throws Exception { + StructObjectInspector structOI = (StructObjectInspector) rowOI; + List<? extends StructField> fields = structOI.getAllStructFieldRefs(); + Object[] converted = new Object[fields.size()]; + for (int i = 0 ; i < converted.length; i++) { + StructField fieldRef = fields.get(i); + Object field = structOI.getStructFieldData(row, fieldRef); + converted[i] = field == null ? null : + SerDeUtils.toThriftPayload(field, fieldRef.getFieldObjectInspector(), protocol); + } + return converted; + } + + @Override + public void close() throws IOException { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/fb230f9d/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java new file mode 100644 index 0000000..5c31974 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.thrift; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.service.rpc.thrift.TColumn; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This SerDe is used to serialize the final output to thrift-able objects directly in the SerDe. Use this SerDe only for final output resultSets. + * It is used if HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS is set to true. It buffers rows that come in from FileSink till it reaches max_buffer_size (also configurable) + * or all rows are finished and FileSink.closeOp() is called. + */ +public class ThriftJDBCBinarySerDe extends AbstractSerDe { + public static final Logger LOG = LoggerFactory.getLogger(ThriftJDBCBinarySerDe.class.getName()); + private List<String> columnNames; + private List<TypeInfo> columnTypes; + private ColumnBuffer[] columnBuffers; + private TypeInfo rowTypeInfo; + private ArrayList<Object> row; + private BytesWritable serializedBytesWritable = new BytesWritable(); + private ByteStream.Output output = new ByteStream.Output(); + private TProtocol protocol = new TCompactProtocol(new TIOStreamTransport(output)); + private ThriftFormatter thriftFormatter = new ThriftFormatter(); + private int MAX_BUFFERED_ROWS; + private int count; + private StructObjectInspector rowObjectInspector; + + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + // Get column names + MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); + String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList<String>(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList<TypeInfo>(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + rowObjectInspector = + (StructObjectInspector) TypeInfoUtils + .getStandardWritableObjectInspectorFromTypeInfo(rowTypeInfo); + + initializeRowAndColumns(); + try { + thriftFormatter.initialize(conf, tbl); + } catch (Exception e) { + new SerDeException(e); + } + } + + @Override + public Class<? extends Writable> getSerializedClass() { + return BytesWritable.class; + } + + private Writable serializeBatch() throws SerDeException { + output.reset(); + for (int i = 0; i < columnBuffers.length; i++) { + TColumn tColumn = columnBuffers[i].toTColumn(); + try { + tColumn.write(protocol); + } catch(TException e) { + throw new SerDeException(e); + } + } + initializeRowAndColumns(); + serializedBytesWritable.set(output.getData(), 0, output.getLength()); + return serializedBytesWritable; + } + + // use the columnNames to initialize the reusable row object and the columnBuffers. reason this is being done is if buffer is full, we should reinitialize the + // column buffers, otherwise at the end when closeOp() is called, things get printed multiple times. + private void initializeRowAndColumns() { + row = new ArrayList<Object>(columnNames.size()); + for (int i = 0; i < columnNames.size(); i++) { + row.add(null); + } + // Initialize column buffers + columnBuffers = new ColumnBuffer[columnNames.size()]; + for (int i = 0; i < columnBuffers.length; i++) { + columnBuffers[i] = new ColumnBuffer(Type.getType(columnTypes.get(i))); + } + } + + /** + * Write TColumn objects to the underlying stream of TProtocol + */ + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + //if row is null, it means there are no more rows (closeOp()). another case can be that the buffer is full. + if (obj == null) + return serializeBatch(); + count += 1; + StructObjectInspector soi = (StructObjectInspector) objInspector; + List<? extends StructField> fields = soi.getAllStructFieldRefs(); + try { + Object[] formattedRow = (Object[]) thriftFormatter.convert(obj, objInspector); + for (int i = 0; i < columnNames.size(); i++) { + columnBuffers[i].addValue(formattedRow[i]); + } + } catch (Exception e) { + throw new SerDeException(e); + } + if (count == MAX_BUFFERED_ROWS) { + count = 0; + return serializeBatch(); + } + return null; + } + + @Override + public SerDeStats getSerDeStats() { + return null; + } + + /** + * Return the bytes from this writable blob. + * Eventually the client of this method will interpret the byte using the Thrift Protocol + */ + @Override + public Object deserialize(Writable blob) throws SerDeException { + return ((BytesWritable) blob).getBytes(); + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return rowObjectInspector; + } + +}
