HIVE-19308: Provide an Arrow stream reader for external LLAP clients (Eric Wohlstadter, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2334a0dd Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2334a0dd Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2334a0dd Branch: refs/heads/branch-3 Commit: 2334a0ddfbd1a96d5fa5891a51be57f6cf408789 Parents: f7f90a0 Author: Jason Dere <[email protected]> Authored: Mon May 21 13:47:43 2018 -0700 Committer: Vineet Garg <[email protected]> Committed: Tue May 29 13:58:34 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 +- .../hive/jdbc/AbstractJdbcTriggersTest.java | 4 +- .../apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 615 +++++++++++++++++++ .../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 603 ------------------ .../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 230 +++++++ .../hive/jdbc/TestJdbcWithMiniLlapRow.java | 45 ++ .../hadoop/hive/llap/LlapBaseRecordReader.java | 101 ++- .../hadoop/hive/llap/LlapRowRecordReader.java | 26 +- llap-ext-client/pom.xml | 5 + .../hive/llap/LlapArrowBatchRecordReader.java | 82 +++ .../hive/llap/LlapArrowRowInputFormat.java | 53 ++ .../hive/llap/LlapArrowRowRecordReader.java | 107 ++++ .../hadoop/hive/llap/LlapBaseInputFormat.java | 27 +- pom.xml | 1 + .../hive/ql/io/arrow/ArrowWrapperWritable.java | 18 +- .../hive/ql/io/arrow/RootAllocatorFactory.java | 9 + .../hadoop/hive/llap/TestLlapOutputFormat.java | 1 + 17 files changed, 1254 insertions(+), 675 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8780374..8347f7f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4161,7 +4161,7 @@ public class HiveConf extends Configuration { Constants.LLAP_LOGGER_NAME_RFA, Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), - LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", false, + LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", true, "Whether LLapOutputFormatService should output arrow batches"), HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms", http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java index 17e44bb..7d5172b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java @@ -90,7 +90,7 @@ public abstract class AbstractJdbcTriggersTest { @Before public void setUp() throws Exception { - hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); } @After @@ -124,7 +124,7 @@ public abstract class AbstractJdbcTriggersTest { throws Exception { Connection con = hs2Conn; - TestJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); + BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); createSleepUDF(); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java new file mode 100644 index 0000000..11017f6 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -0,0 +1,615 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.lang.reflect.Field; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.FieldDesc; +import org.apache.hadoop.hive.llap.LlapRowRecordReader; +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; + +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import org.datanucleus.ClassLoaderResolver; +import org.datanucleus.NucleusContext; +import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; +import org.datanucleus.AbstractNucleusContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.apache.hadoop.mapred.InputFormat; + +/** + * Specialize this base class for different serde's/formats + * {@link #beforeTest(boolean) beforeTest} should be called + * by sub-classes in a {@link org.junit.BeforeClass} initializer + */ +public abstract class BaseJdbcWithMiniLlap { + private static MiniHS2 miniHS2 = null; + private static String dataFileDir; + private static Path kvDataFilePath; + private static Path dataTypesFilePath; + + private static HiveConf conf = null; + private static Connection hs2Conn = null; + + // This method should be called by sub-classes in a @BeforeClass initializer + public static void beforeTest(boolean useArrow) throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + if (confDir != null && !confDir.isEmpty()) { + HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation()); + } + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + if(useArrow) { + conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); + } else { + conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false); + } + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + dataTypesFilePath = new Path(dataFileDir, "datatypes.txt"); + Map<String, String> confOverlay = new HashMap<String, String>(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Before + public void setUp() throws Exception { + hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + } + + public static Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException { + Connection conn = DriverManager.getConnection(jdbcURL, user, pwd); + conn.createStatement().execute("set hive.support.concurrency = false"); + return conn; + } + + @After + public void tearDown() throws Exception { + LlapBaseInputFormat.closeAll(); + hs2Conn.close(); + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + } + + private void createTestTable(String tableName) throws Exception { + createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString()); + } + + public static void createTestTable(Connection connection, String database, String tableName, String srcFile) throws + Exception { + Statement stmt = connection.createStatement(); + + if (database != null) { + stmt.execute("CREATE DATABASE IF NOT EXISTS " + database); + stmt.execute("USE " + database); + } + + // create table + stmt.execute("DROP TABLE IF EXISTS " + tableName); + stmt.execute("CREATE TABLE " + tableName + + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'"); + + // load data + stmt.execute("load data local inpath '" + srcFile + "' into table " + tableName); + + ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); + assertTrue(res.next()); + assertEquals("val_238", res.getString(2)); + res.close(); + stmt.close(); + } + + protected void createDataTypesTable(String tableName) throws Exception { + Statement stmt = hs2Conn.createStatement(); + + // create table + stmt.execute("DROP TABLE IF EXISTS " + tableName); + // tables with various types + stmt.execute("create table " + tableName + + " (c1 int, c2 boolean, c3 double, c4 string," + + " c5 array<int>, c6 map<int,string>, c7 map<string,string>," + + " c8 struct<r:string,s:int,t:double>," + + " c9 tinyint, c10 smallint, c11 float, c12 bigint," + + " c13 array<array<string>>," + + " c14 map<int, map<int,int>>," + + " c15 struct<r:int,s:struct<a:int,b:string>>," + + " c16 array<struct<m:map<string,string>,n:int>>," + + " c17 timestamp, " + + " c18 decimal(16,7), " + + " c19 binary, " + + " c20 date," + + " c21 varchar(20)," + + " c22 char(15)," + + " c23 binary" + + ")"); + stmt.execute("load data local inpath '" + + dataTypesFilePath.toString() + "' into table " + tableName); + stmt.close(); + } + + @Test(timeout = 60000) + public void testLlapInputFormatEndToEnd() throws Exception { + createTestTable("testtab1"); + + int rowCount; + + RowCollector rowCollector = new RowCollector(); + String query = "select * from testtab1 where under_col = 0"; + rowCount = processQuery(query, 1, rowCollector); + assertEquals(3, rowCount); + assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(0)); + assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(1)); + assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(2)); + + // Try empty rows query + rowCollector.rows.clear(); + query = "select * from testtab1 where true = false"; + rowCount = processQuery(query, 1, rowCollector); + assertEquals(0, rowCount); + } + + @Test(timeout = 60000) + public void testNonAsciiStrings() throws Exception { + createTestTable("testtab_nonascii"); + + RowCollector rowCollector = new RowCollector(); + String nonAscii = "à côté du garçon"; + String query = "select value, '" + nonAscii + "' from testtab_nonascii where under_col=0"; + int rowCount = processQuery(query, 1, rowCollector); + assertEquals(3, rowCount); + + assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(0)); + assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(1)); + assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(2)); + } + + @Test(timeout = 60000) + public void testEscapedStrings() throws Exception { + createTestTable("testtab1"); + + RowCollector rowCollector = new RowCollector(); + String expectedVal1 = "'a',\"b\",\\c\\"; + String expectedVal2 = "multi\nline"; + String query = "select value, '\\'a\\',\"b\",\\\\c\\\\', 'multi\\nline' from testtab1 where under_col=0"; + int rowCount = processQuery(query, 1, rowCollector); + assertEquals(3, rowCount); + + assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(0)); + assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(1)); + assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(2)); + } + + @Test(timeout = 60000) + public void testDataTypes() throws Exception { + createDataTypesTable("datatypes"); + RowCollector2 rowCollector = new RowCollector2(); + String query = "select * from datatypes"; + int rowCount = processQuery(query, 1, rowCollector); + assertEquals(3, rowCount); + + // Verify schema + String[][] colNameTypes = new String[][] { + {"datatypes.c1", "int"}, + {"datatypes.c2", "boolean"}, + {"datatypes.c3", "double"}, + {"datatypes.c4", "string"}, + {"datatypes.c5", "array<int>"}, + {"datatypes.c6", "map<int,string>"}, + {"datatypes.c7", "map<string,string>"}, + {"datatypes.c8", "struct<r:string,s:int,t:double>"}, + {"datatypes.c9", "tinyint"}, + {"datatypes.c10", "smallint"}, + {"datatypes.c11", "float"}, + {"datatypes.c12", "bigint"}, + {"datatypes.c13", "array<array<string>>"}, + {"datatypes.c14", "map<int,map<int,int>>"}, + {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"}, + {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"}, + {"datatypes.c17", "timestamp"}, + {"datatypes.c18", "decimal(16,7)"}, + {"datatypes.c19", "binary"}, + {"datatypes.c20", "date"}, + {"datatypes.c21", "varchar(20)"}, + {"datatypes.c22", "char(15)"}, + {"datatypes.c23", "binary"}, + }; + FieldDesc fieldDesc; + assertEquals(23, rowCollector.numColumns); + for (int idx = 0; idx < rowCollector.numColumns; ++idx) { + fieldDesc = rowCollector.schema.getColumns().get(idx); + assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName()); + assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName()); + } + + // First row is all nulls + Object[] rowValues = rowCollector.rows.get(0); + for (int idx = 0; idx < rowCollector.numColumns; ++idx) { + assertEquals("idx=" + idx, null, rowValues[idx]); + } + + // Second Row + rowValues = rowCollector.rows.get(1); + assertEquals(Integer.valueOf(-1), rowValues[0]); + assertEquals(Boolean.FALSE, rowValues[1]); + assertEquals(Double.valueOf(-1.1d), rowValues[2]); + assertEquals("", rowValues[3]); + + List<?> c5Value = (List<?>) rowValues[4]; + assertEquals(0, c5Value.size()); + + Map<?,?> c6Value = (Map<?,?>) rowValues[5]; + assertEquals(0, c6Value.size()); + + Map<?,?> c7Value = (Map<?,?>) rowValues[6]; + assertEquals(0, c7Value.size()); + + List<?> c8Value = (List<?>) rowValues[7]; + assertEquals(null, c8Value.get(0)); + assertEquals(null, c8Value.get(1)); + assertEquals(null, c8Value.get(2)); + + assertEquals(Byte.valueOf((byte) -1), rowValues[8]); + assertEquals(Short.valueOf((short) -1), rowValues[9]); + assertEquals(Float.valueOf(-1.0f), rowValues[10]); + assertEquals(Long.valueOf(-1l), rowValues[11]); + + List<?> c13Value = (List<?>) rowValues[12]; + assertEquals(0, c13Value.size()); + + Map<?,?> c14Value = (Map<?,?>) rowValues[13]; + assertEquals(0, c14Value.size()); + + List<?> c15Value = (List<?>) rowValues[14]; + assertEquals(null, c15Value.get(0)); + assertEquals(null, c15Value.get(1)); + + List<?> c16Value = (List<?>) rowValues[15]; + assertEquals(0, c16Value.size()); + + assertEquals(null, rowValues[16]); + assertEquals(null, rowValues[17]); + assertEquals(null, rowValues[18]); + assertEquals(null, rowValues[19]); + assertEquals(null, rowValues[20]); + assertEquals(null, rowValues[21]); + assertEquals(null, rowValues[22]); + + // Third row + rowValues = rowCollector.rows.get(2); + assertEquals(Integer.valueOf(1), rowValues[0]); + assertEquals(Boolean.TRUE, rowValues[1]); + assertEquals(Double.valueOf(1.1d), rowValues[2]); + assertEquals("1", rowValues[3]); + + c5Value = (List<?>) rowValues[4]; + assertEquals(2, c5Value.size()); + assertEquals(Integer.valueOf(1), c5Value.get(0)); + assertEquals(Integer.valueOf(2), c5Value.get(1)); + + c6Value = (Map<?,?>) rowValues[5]; + assertEquals(2, c6Value.size()); + assertEquals("x", c6Value.get(Integer.valueOf(1))); + assertEquals("y", c6Value.get(Integer.valueOf(2))); + + c7Value = (Map<?,?>) rowValues[6]; + assertEquals(1, c7Value.size()); + assertEquals("v", c7Value.get("k")); + + c8Value = (List<?>) rowValues[7]; + assertEquals("a", c8Value.get(0)); + assertEquals(Integer.valueOf(9), c8Value.get(1)); + assertEquals(Double.valueOf(2.2d), c8Value.get(2)); + + assertEquals(Byte.valueOf((byte) 1), rowValues[8]); + assertEquals(Short.valueOf((short) 1), rowValues[9]); + assertEquals(Float.valueOf(1.0f), rowValues[10]); + assertEquals(Long.valueOf(1l), rowValues[11]); + + c13Value = (List<?>) rowValues[12]; + assertEquals(2, c13Value.size()); + List<?> listVal = (List<?>) c13Value.get(0); + assertEquals("a", listVal.get(0)); + assertEquals("b", listVal.get(1)); + listVal = (List<?>) c13Value.get(1); + assertEquals("c", listVal.get(0)); + assertEquals("d", listVal.get(1)); + + c14Value = (Map<?,?>) rowValues[13]; + assertEquals(2, c14Value.size()); + Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1)); + assertEquals(2, mapVal.size()); + assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11))); + assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13))); + mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2)); + assertEquals(1, mapVal.size()); + assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21))); + + c15Value = (List<?>) rowValues[14]; + assertEquals(Integer.valueOf(1), c15Value.get(0)); + listVal = (List<?>) c15Value.get(1); + assertEquals(2, listVal.size()); + assertEquals(Integer.valueOf(2), listVal.get(0)); + assertEquals("x", listVal.get(1)); + + c16Value = (List<?>) rowValues[15]; + assertEquals(2, c16Value.size()); + listVal = (List<?>) c16Value.get(0); + assertEquals(2, listVal.size()); + mapVal = (Map<?,?>) listVal.get(0); + assertEquals(0, mapVal.size()); + assertEquals(Integer.valueOf(1), listVal.get(1)); + listVal = (List<?>) c16Value.get(1); + mapVal = (Map<?,?>) listVal.get(0); + assertEquals(2, mapVal.size()); + assertEquals("b", mapVal.get("a")); + assertEquals("d", mapVal.get("c")); + assertEquals(Integer.valueOf(2), listVal.get(1)); + + assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]); + assertEquals(new BigDecimal("123456789.123456"), rowValues[17]); + assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]); + assertEquals(Date.valueOf("2013-01-01"), rowValues[19]); + assertEquals("abc123", rowValues[20]); + assertEquals("abc123 ", rowValues[21]); + assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]); + } + + private interface RowProcessor { + void process(Row row); + } + + protected static class RowCollector implements RowProcessor { + ArrayList<String[]> rows = new ArrayList<String[]>(); + Schema schema = null; + int numColumns = 0; + + public void process(Row row) { + if (schema == null) { + schema = row.getSchema(); + numColumns = schema.getColumns().size(); + } + + String[] arr = new String[numColumns]; + for (int idx = 0; idx < numColumns; ++idx) { + Object val = row.getValue(idx); + arr[idx] = (val == null ? null : val.toString()); + } + rows.add(arr); + } + } + + // Save the actual values from each row as opposed to the String representation. + protected static class RowCollector2 implements RowProcessor { + ArrayList<Object[]> rows = new ArrayList<Object[]>(); + Schema schema = null; + int numColumns = 0; + + public void process(Row row) { + if (schema == null) { + schema = row.getSchema(); + numColumns = schema.getColumns().size(); + } + + Object[] arr = new Object[numColumns]; + for (int idx = 0; idx < numColumns; ++idx) { + arr[idx] = row.getValue(idx); + } + rows.add(arr); + } + } + + protected int processQuery(String query, int numSplits, RowProcessor rowProcessor) throws Exception { + return processQuery(null, query, numSplits, rowProcessor); + } + + protected abstract InputFormat<NullWritable, Row> getInputFormat(); + + private int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) throws Exception { + String url = miniHS2.getJdbcURL(); + String user = System.getProperty("user.name"); + String pwd = user; + String handleId = UUID.randomUUID().toString(); + + InputFormat<NullWritable, Row> inputFormat = getInputFormat(); + + // Get splits + JobConf job = new JobConf(conf); + job.set(LlapBaseInputFormat.URL_KEY, url); + job.set(LlapBaseInputFormat.USER_KEY, user); + job.set(LlapBaseInputFormat.PWD_KEY, pwd); + job.set(LlapBaseInputFormat.QUERY_KEY, query); + job.set(LlapBaseInputFormat.HANDLE_ID, handleId); + if (currentDatabase != null) { + job.set(LlapBaseInputFormat.DB_KEY, currentDatabase); + } + + InputSplit[] splits = inputFormat.getSplits(job, numSplits); + assertTrue(splits.length > 0); + + // Fetch rows from splits + boolean first = true; + int rowCount = 0; + for (InputSplit split : splits) { + System.out.println("Processing split " + split.getLocations()); + + int numColumns = 2; + RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null); + Row row = reader.createValue(); + while (reader.next(NullWritable.get(), row)) { + rowProcessor.process(row); + ++rowCount; + } + reader.close(); + } + LlapBaseInputFormat.close(handleId); + + return rowCount; + } + + /** + * Test CLI kill command of a query that is running. + * We spawn 2 threads - one running the query and + * the other attempting to cancel. + * We're using a dummy udf to simulate a query, + * that runs for a sufficiently long time. + * @throws Exception + */ + @Test + public void testKillQuery() throws Exception { + String tableName = "testtab1"; + createTestTable(tableName); + Connection con = hs2Conn; + Connection con2 = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + + String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); + Statement stmt1 = con.createStatement(); + Statement stmt2 = con2.createStatement(); + stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); + stmt1.close(); + final Statement stmt = con.createStatement(); + + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + + // Thread executing the query + Thread tExecute = new Thread(new Runnable() { + @Override + public void run() { + try { + System.out.println("Executing query: "); + // The test table has 500 rows, so total query time should be ~ 500*500ms + stmt.executeQuery("select sleepMsUDF(t1.under_col, 100), t1.under_col, t2.under_col " + + "from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col"); + fail("Expecting SQLException"); + } catch (SQLException e) { + tExecuteHolder.throwable = e; + } + } + }); + // Thread killing the query + Thread tKill = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(2000); + String queryId = ((HiveStatement) stmt).getQueryId(); + System.out.println("Killing query: " + queryId); + + stmt2.execute("kill query '" + queryId + "'"); + stmt2.close(); + } catch (Exception e) { + tKillHolder.throwable = e; + } + } + }); + + tExecute.start(); + tKill.start(); + tExecute.join(); + tKill.join(); + stmt.close(); + con2.close(); + + assertNotNull("tExecute", tExecuteHolder.throwable); + assertNull("tCancel", tKillHolder.throwable); + } + + private static class ExceptionHolder { + Throwable throwable; + } +} + http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java deleted file mode 100644 index 68a8e21..0000000 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ /dev/null @@ -1,603 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.jdbc; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.lang.reflect.Field; -import java.math.BigDecimal; -import java.net.URL; -import java.sql.Connection; -import java.sql.Date; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.FieldDesc; -import org.apache.hadoop.hive.llap.LlapRowRecordReader; -import org.apache.hadoop.hive.llap.Row; -import org.apache.hadoop.hive.llap.Schema; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; - -import org.apache.hive.jdbc.miniHS2.MiniHS2; -import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; -import org.apache.hadoop.hive.llap.LlapBaseInputFormat; -import org.apache.hadoop.hive.llap.LlapRowInputFormat; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; - -import org.datanucleus.ClassLoaderResolver; -import org.datanucleus.NucleusContext; -import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; -import org.datanucleus.AbstractNucleusContext; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -public class TestJdbcWithMiniLlap { - private static MiniHS2 miniHS2 = null; - private static String dataFileDir; - private static Path kvDataFilePath; - private static Path dataTypesFilePath; - - private static HiveConf conf = null; - private Connection hs2Conn = null; - - @BeforeClass - public static void beforeTest() throws Exception { - Class.forName(MiniHS2.getJdbcDriverName()); - - String confDir = "../../data/conf/llap/"; - if (confDir != null && !confDir.isEmpty()) { - HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml")); - System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation()); - } - - conf = new HiveConf(); - conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/tez-site.xml")); - - miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); - - dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); - kvDataFilePath = new Path(dataFileDir, "kv1.txt"); - dataTypesFilePath = new Path(dataFileDir, "datatypes.txt"); - Map<String, String> confOverlay = new HashMap<String, String>(); - miniHS2.start(confOverlay); - miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); - } - - @Before - public void setUp() throws Exception { - hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); - } - - public static Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException { - Connection conn = DriverManager.getConnection(jdbcURL, user, pwd); - conn.createStatement().execute("set hive.support.concurrency = false"); - return conn; - } - - @After - public void tearDown() throws Exception { - LlapBaseInputFormat.closeAll(); - hs2Conn.close(); - } - - @AfterClass - public static void afterTest() throws Exception { - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - } - - private void createTestTable(String tableName) throws Exception { - createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString()); - } - - public static void createTestTable(Connection connection, String database, String tableName, String srcFile) throws - Exception { - Statement stmt = connection.createStatement(); - - if (database != null) { - stmt.execute("CREATE DATABASE IF NOT EXISTS " + database); - stmt.execute("USE " + database); - } - - // create table - stmt.execute("DROP TABLE IF EXISTS " + tableName); - stmt.execute("CREATE TABLE " + tableName - + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'"); - - // load data - stmt.execute("load data local inpath '" + srcFile + "' into table " + tableName); - - ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); - assertTrue(res.next()); - assertEquals("val_238", res.getString(2)); - res.close(); - stmt.close(); - } - - private void createDataTypesTable(String tableName) throws Exception { - Statement stmt = hs2Conn.createStatement(); - - // create table - stmt.execute("DROP TABLE IF EXISTS " + tableName); - // tables with various types - stmt.execute("create table " + tableName - + " (c1 int, c2 boolean, c3 double, c4 string," - + " c5 array<int>, c6 map<int,string>, c7 map<string,string>," - + " c8 struct<r:string,s:int,t:double>," - + " c9 tinyint, c10 smallint, c11 float, c12 bigint," - + " c13 array<array<string>>," - + " c14 map<int, map<int,int>>," - + " c15 struct<r:int,s:struct<a:int,b:string>>," - + " c16 array<struct<m:map<string,string>,n:int>>," - + " c17 timestamp, " - + " c18 decimal(16,7), " - + " c19 binary, " - + " c20 date," - + " c21 varchar(20)," - + " c22 char(15)," - + " c23 binary" - + ")"); - stmt.execute("load data local inpath '" - + dataTypesFilePath.toString() + "' into table " + tableName); - stmt.close(); - } - - @Test(timeout = 60000) - public void testLlapInputFormatEndToEnd() throws Exception { - createTestTable("testtab1"); - - int rowCount; - - RowCollector rowCollector = new RowCollector(); - String query = "select * from testtab1 where under_col = 0"; - rowCount = processQuery(query, 1, rowCollector); - assertEquals(3, rowCount); - assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(0)); - assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(1)); - assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(2)); - - // Try empty rows query - rowCollector.rows.clear(); - query = "select * from testtab1 where true = false"; - rowCount = processQuery(query, 1, rowCollector); - assertEquals(0, rowCount); - } - - @Test(timeout = 60000) - public void testNonAsciiStrings() throws Exception { - createTestTable(hs2Conn, "nonascii", "testtab_nonascii", kvDataFilePath.toString()); - - RowCollector rowCollector = new RowCollector(); - String nonAscii = "à côté du garçon"; - String query = "select value, '" + nonAscii + "' from testtab_nonascii where under_col=0"; - int rowCount = processQuery("nonascii", query, 1, rowCollector); - assertEquals(3, rowCount); - - assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(0)); - assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(1)); - assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(2)); - } - - @Test(timeout = 60000) - public void testEscapedStrings() throws Exception { - createTestTable("testtab1"); - - RowCollector rowCollector = new RowCollector(); - String expectedVal1 = "'a',\"b\",\\c\\"; - String expectedVal2 = "multi\nline"; - String query = "select value, '\\'a\\',\"b\",\\\\c\\\\', 'multi\\nline' from testtab1 where under_col=0"; - int rowCount = processQuery(query, 1, rowCollector); - assertEquals(3, rowCount); - - assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(0)); - assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(1)); - assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(2)); - } - - @Test(timeout = 60000) - public void testDataTypes() throws Exception { - createDataTypesTable("datatypes"); - RowCollector2 rowCollector = new RowCollector2(); - String query = "select * from datatypes"; - int rowCount = processQuery(query, 1, rowCollector); - assertEquals(3, rowCount); - - // Verify schema - String[][] colNameTypes = new String[][] { - {"datatypes.c1", "int"}, - {"datatypes.c2", "boolean"}, - {"datatypes.c3", "double"}, - {"datatypes.c4", "string"}, - {"datatypes.c5", "array<int>"}, - {"datatypes.c6", "map<int,string>"}, - {"datatypes.c7", "map<string,string>"}, - {"datatypes.c8", "struct<r:string,s:int,t:double>"}, - {"datatypes.c9", "tinyint"}, - {"datatypes.c10", "smallint"}, - {"datatypes.c11", "float"}, - {"datatypes.c12", "bigint"}, - {"datatypes.c13", "array<array<string>>"}, - {"datatypes.c14", "map<int,map<int,int>>"}, - {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"}, - {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"}, - {"datatypes.c17", "timestamp"}, - {"datatypes.c18", "decimal(16,7)"}, - {"datatypes.c19", "binary"}, - {"datatypes.c20", "date"}, - {"datatypes.c21", "varchar(20)"}, - {"datatypes.c22", "char(15)"}, - {"datatypes.c23", "binary"}, - }; - FieldDesc fieldDesc; - assertEquals(23, rowCollector.numColumns); - for (int idx = 0; idx < rowCollector.numColumns; ++idx) { - fieldDesc = rowCollector.schema.getColumns().get(idx); - assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName()); - assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName()); - } - - // First row is all nulls - Object[] rowValues = rowCollector.rows.get(0); - for (int idx = 0; idx < rowCollector.numColumns; ++idx) { - assertEquals("idx=" + idx, null, rowValues[idx]); - } - - // Second Row - rowValues = rowCollector.rows.get(1); - assertEquals(Integer.valueOf(-1), rowValues[0]); - assertEquals(Boolean.FALSE, rowValues[1]); - assertEquals(Double.valueOf(-1.1d), rowValues[2]); - assertEquals("", rowValues[3]); - - List<?> c5Value = (List<?>) rowValues[4]; - assertEquals(0, c5Value.size()); - - Map<?,?> c6Value = (Map<?,?>) rowValues[5]; - assertEquals(0, c6Value.size()); - - Map<?,?> c7Value = (Map<?,?>) rowValues[6]; - assertEquals(0, c7Value.size()); - - List<?> c8Value = (List<?>) rowValues[7]; - assertEquals(null, c8Value.get(0)); - assertEquals(null, c8Value.get(1)); - assertEquals(null, c8Value.get(2)); - - assertEquals(Byte.valueOf((byte) -1), rowValues[8]); - assertEquals(Short.valueOf((short) -1), rowValues[9]); - assertEquals(Float.valueOf(-1.0f), rowValues[10]); - assertEquals(Long.valueOf(-1l), rowValues[11]); - - List<?> c13Value = (List<?>) rowValues[12]; - assertEquals(0, c13Value.size()); - - Map<?,?> c14Value = (Map<?,?>) rowValues[13]; - assertEquals(0, c14Value.size()); - - List<?> c15Value = (List<?>) rowValues[14]; - assertEquals(null, c15Value.get(0)); - assertEquals(null, c15Value.get(1)); - - List<?> c16Value = (List<?>) rowValues[15]; - assertEquals(0, c16Value.size()); - - assertEquals(null, rowValues[16]); - assertEquals(null, rowValues[17]); - assertEquals(null, rowValues[18]); - assertEquals(null, rowValues[19]); - assertEquals(null, rowValues[20]); - assertEquals(null, rowValues[21]); - assertEquals(null, rowValues[22]); - - // Third row - rowValues = rowCollector.rows.get(2); - assertEquals(Integer.valueOf(1), rowValues[0]); - assertEquals(Boolean.TRUE, rowValues[1]); - assertEquals(Double.valueOf(1.1d), rowValues[2]); - assertEquals("1", rowValues[3]); - - c5Value = (List<?>) rowValues[4]; - assertEquals(2, c5Value.size()); - assertEquals(Integer.valueOf(1), c5Value.get(0)); - assertEquals(Integer.valueOf(2), c5Value.get(1)); - - c6Value = (Map<?,?>) rowValues[5]; - assertEquals(2, c6Value.size()); - assertEquals("x", c6Value.get(Integer.valueOf(1))); - assertEquals("y", c6Value.get(Integer.valueOf(2))); - - c7Value = (Map<?,?>) rowValues[6]; - assertEquals(1, c7Value.size()); - assertEquals("v", c7Value.get("k")); - - c8Value = (List<?>) rowValues[7]; - assertEquals("a", c8Value.get(0)); - assertEquals(Integer.valueOf(9), c8Value.get(1)); - assertEquals(Double.valueOf(2.2d), c8Value.get(2)); - - assertEquals(Byte.valueOf((byte) 1), rowValues[8]); - assertEquals(Short.valueOf((short) 1), rowValues[9]); - assertEquals(Float.valueOf(1.0f), rowValues[10]); - assertEquals(Long.valueOf(1l), rowValues[11]); - - c13Value = (List<?>) rowValues[12]; - assertEquals(2, c13Value.size()); - List<?> listVal = (List<?>) c13Value.get(0); - assertEquals("a", listVal.get(0)); - assertEquals("b", listVal.get(1)); - listVal = (List<?>) c13Value.get(1); - assertEquals("c", listVal.get(0)); - assertEquals("d", listVal.get(1)); - - c14Value = (Map<?,?>) rowValues[13]; - assertEquals(2, c14Value.size()); - Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1)); - assertEquals(2, mapVal.size()); - assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11))); - assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13))); - mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2)); - assertEquals(1, mapVal.size()); - assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21))); - - c15Value = (List<?>) rowValues[14]; - assertEquals(Integer.valueOf(1), c15Value.get(0)); - listVal = (List<?>) c15Value.get(1); - assertEquals(2, listVal.size()); - assertEquals(Integer.valueOf(2), listVal.get(0)); - assertEquals("x", listVal.get(1)); - - c16Value = (List<?>) rowValues[15]; - assertEquals(2, c16Value.size()); - listVal = (List<?>) c16Value.get(0); - assertEquals(2, listVal.size()); - mapVal = (Map<?,?>) listVal.get(0); - assertEquals(0, mapVal.size()); - assertEquals(Integer.valueOf(1), listVal.get(1)); - listVal = (List<?>) c16Value.get(1); - mapVal = (Map<?,?>) listVal.get(0); - assertEquals(2, mapVal.size()); - assertEquals("b", mapVal.get("a")); - assertEquals("d", mapVal.get("c")); - assertEquals(Integer.valueOf(2), listVal.get(1)); - - assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]); - assertEquals(new BigDecimal("123456789.123456"), rowValues[17]); - assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]); - assertEquals(Date.valueOf("2013-01-01"), rowValues[19]); - assertEquals("abc123", rowValues[20]); - assertEquals("abc123 ", rowValues[21]); - assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]); - } - - private interface RowProcessor { - void process(Row row); - } - - private static class RowCollector implements RowProcessor { - ArrayList<String[]> rows = new ArrayList<String[]>(); - Schema schema = null; - int numColumns = 0; - - public void process(Row row) { - if (schema == null) { - schema = row.getSchema(); - numColumns = schema.getColumns().size(); - } - - String[] arr = new String[numColumns]; - for (int idx = 0; idx < numColumns; ++idx) { - Object val = row.getValue(idx); - arr[idx] = (val == null ? null : val.toString()); - } - rows.add(arr); - } - } - - // Save the actual values from each row as opposed to the String representation. - private static class RowCollector2 implements RowProcessor { - ArrayList<Object[]> rows = new ArrayList<Object[]>(); - Schema schema = null; - int numColumns = 0; - - public void process(Row row) { - if (schema == null) { - schema = row.getSchema(); - numColumns = schema.getColumns().size(); - } - - Object[] arr = new Object[numColumns]; - for (int idx = 0; idx < numColumns; ++idx) { - arr[idx] = row.getValue(idx); - } - rows.add(arr); - } - } - - private int processQuery(String query, int numSplits, RowProcessor rowProcessor) throws Exception { - return processQuery(null, query, numSplits, rowProcessor); - } - - private int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) throws Exception { - String url = miniHS2.getJdbcURL(); - String user = System.getProperty("user.name"); - String pwd = user; - String handleId = UUID.randomUUID().toString(); - - LlapRowInputFormat inputFormat = new LlapRowInputFormat(); - - // Get splits - JobConf job = new JobConf(conf); - job.set(LlapBaseInputFormat.URL_KEY, url); - job.set(LlapBaseInputFormat.USER_KEY, user); - job.set(LlapBaseInputFormat.PWD_KEY, pwd); - job.set(LlapBaseInputFormat.QUERY_KEY, query); - job.set(LlapBaseInputFormat.HANDLE_ID, handleId); - if (currentDatabase != null) { - job.set(LlapBaseInputFormat.DB_KEY, currentDatabase); - } - - InputSplit[] splits = inputFormat.getSplits(job, numSplits); - assertTrue(splits.length > 0); - - // Fetch rows from splits - boolean first = true; - int rowCount = 0; - for (InputSplit split : splits) { - System.out.println("Processing split " + split.getLocations()); - - int numColumns = 2; - RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null); - Row row = reader.createValue(); - while (reader.next(NullWritable.get(), row)) { - rowProcessor.process(row); - ++rowCount; - } - reader.close(); - } - LlapBaseInputFormat.close(handleId); - - return rowCount; - } - - /** - * Test CLI kill command of a query that is running. - * We spawn 2 threads - one running the query and - * the other attempting to cancel. - * We're using a dummy udf to simulate a query, - * that runs for a sufficiently long time. - * @throws Exception - */ - @Test - public void testKillQuery() throws Exception { - String tableName = "testtab1"; - createTestTable(tableName); - Connection con = hs2Conn; - Connection con2 = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); - - String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); - Statement stmt1 = con.createStatement(); - Statement stmt2 = con2.createStatement(); - stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); - stmt1.close(); - final Statement stmt = con.createStatement(); - - ExceptionHolder tExecuteHolder = new ExceptionHolder(); - ExceptionHolder tKillHolder = new ExceptionHolder(); - - // Thread executing the query - Thread tExecute = new Thread(new Runnable() { - @Override - public void run() { - try { - System.out.println("Executing query: "); - // The test table has 500 rows, so total query time should be ~ 500*500ms - stmt.executeQuery("select sleepMsUDF(t1.under_col, 100), t1.under_col, t2.under_col " + - "from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col"); - fail("Expecting SQLException"); - } catch (SQLException e) { - tExecuteHolder.throwable = e; - } - } - }); - // Thread killing the query - Thread tKill = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(2000); - String queryId = ((HiveStatement) stmt).getQueryId(); - System.out.println("Killing query: " + queryId); - - stmt2.execute("kill query '" + queryId + "'"); - stmt2.close(); - } catch (Exception e) { - tKillHolder.throwable = e; - } - } - }); - - tExecute.start(); - tKill.start(); - tExecute.join(); - tKill.join(); - stmt.close(); - con2.close(); - - assertNotNull("tExecute", tExecuteHolder.throwable); - assertNull("tCancel", tKillHolder.throwable); - } - - private static class ExceptionHolder { - Throwable throwable; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java new file mode 100644 index 0000000..afb9837 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.List; +import org.apache.hadoop.hive.llap.FieldDesc; +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.io.NullWritable; +import org.junit.BeforeClass; + +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; + +/** + * TestJdbcWithMiniLlap for Arrow format + */ +public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { + + + @BeforeClass + public static void beforeTest() throws Exception { + BaseJdbcWithMiniLlap.beforeTest(true); + } + + @Override + protected InputFormat<NullWritable, Row> getInputFormat() { + //For unit testing, no harm in hard-coding allocator ceiling to LONG.MAX_VALUE + return new LlapArrowRowInputFormat(Long.MAX_VALUE); + } + + // Currently MAP type is not supported. Add it back when Arrow 1.0 is released. + // See: SPARK-21187 + @Override + public void testDataTypes() throws Exception { + createDataTypesTable("datatypes"); + RowCollector2 rowCollector = new RowCollector2(); + String query = "select * from datatypes"; + int rowCount = processQuery(query, 1, rowCollector); + assertEquals(3, rowCount); + + // Verify schema + String[][] colNameTypes = new String[][] { + {"datatypes.c1", "int"}, + {"datatypes.c2", "boolean"}, + {"datatypes.c3", "double"}, + {"datatypes.c4", "string"}, + {"datatypes.c5", "array<int>"}, + {"datatypes.c6", "map<int,string>"}, + {"datatypes.c7", "map<string,string>"}, + {"datatypes.c8", "struct<r:string,s:int,t:double>"}, + {"datatypes.c9", "tinyint"}, + {"datatypes.c10", "smallint"}, + {"datatypes.c11", "float"}, + {"datatypes.c12", "bigint"}, + {"datatypes.c13", "array<array<string>>"}, + {"datatypes.c14", "map<int,map<int,int>>"}, + {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"}, + {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"}, + {"datatypes.c17", "timestamp"}, + {"datatypes.c18", "decimal(16,7)"}, + {"datatypes.c19", "binary"}, + {"datatypes.c20", "date"}, + {"datatypes.c21", "varchar(20)"}, + {"datatypes.c22", "char(15)"}, + {"datatypes.c23", "binary"}, + }; + FieldDesc fieldDesc; + assertEquals(23, rowCollector.numColumns); + for (int idx = 0; idx < rowCollector.numColumns; ++idx) { + fieldDesc = rowCollector.schema.getColumns().get(idx); + assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName()); + assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName()); + } + + // First row is all nulls + Object[] rowValues = rowCollector.rows.get(0); + for (int idx = 0; idx < rowCollector.numColumns; ++idx) { + assertEquals("idx=" + idx, null, rowValues[idx]); + } + + // Second Row + rowValues = rowCollector.rows.get(1); + assertEquals(Integer.valueOf(-1), rowValues[0]); + assertEquals(Boolean.FALSE, rowValues[1]); + assertEquals(Double.valueOf(-1.1d), rowValues[2]); + assertEquals("", rowValues[3]); + + List<?> c5Value = (List<?>) rowValues[4]; + assertEquals(0, c5Value.size()); + + //Map<?,?> c6Value = (Map<?,?>) rowValues[5]; + //assertEquals(0, c6Value.size()); + + //Map<?,?> c7Value = (Map<?,?>) rowValues[6]; + //assertEquals(0, c7Value.size()); + + List<?> c8Value = (List<?>) rowValues[7]; + assertEquals(null, c8Value.get(0)); + assertEquals(null, c8Value.get(1)); + assertEquals(null, c8Value.get(2)); + + assertEquals(Byte.valueOf((byte) -1), rowValues[8]); + assertEquals(Short.valueOf((short) -1), rowValues[9]); + assertEquals(Float.valueOf(-1.0f), rowValues[10]); + assertEquals(Long.valueOf(-1l), rowValues[11]); + + List<?> c13Value = (List<?>) rowValues[12]; + assertEquals(0, c13Value.size()); + + //Map<?,?> c14Value = (Map<?,?>) rowValues[13]; + //assertEquals(0, c14Value.size()); + + List<?> c15Value = (List<?>) rowValues[14]; + assertEquals(null, c15Value.get(0)); + assertEquals(null, c15Value.get(1)); + + //List<?> c16Value = (List<?>) rowValues[15]; + //assertEquals(0, c16Value.size()); + + assertEquals(null, rowValues[16]); + assertEquals(null, rowValues[17]); + assertEquals(null, rowValues[18]); + assertEquals(null, rowValues[19]); + assertEquals(null, rowValues[20]); + assertEquals(null, rowValues[21]); + assertEquals(null, rowValues[22]); + + // Third row + rowValues = rowCollector.rows.get(2); + assertEquals(Integer.valueOf(1), rowValues[0]); + assertEquals(Boolean.TRUE, rowValues[1]); + assertEquals(Double.valueOf(1.1d), rowValues[2]); + assertEquals("1", rowValues[3]); + + c5Value = (List<?>) rowValues[4]; + assertEquals(2, c5Value.size()); + assertEquals(Integer.valueOf(1), c5Value.get(0)); + assertEquals(Integer.valueOf(2), c5Value.get(1)); + + //c6Value = (Map<?,?>) rowValues[5]; + //assertEquals(2, c6Value.size()); + //assertEquals("x", c6Value.get(Integer.valueOf(1))); + //assertEquals("y", c6Value.get(Integer.valueOf(2))); + + //c7Value = (Map<?,?>) rowValues[6]; + //assertEquals(1, c7Value.size()); + //assertEquals("v", c7Value.get("k")); + + c8Value = (List<?>) rowValues[7]; + assertEquals("a", c8Value.get(0)); + assertEquals(Integer.valueOf(9), c8Value.get(1)); + assertEquals(Double.valueOf(2.2d), c8Value.get(2)); + + assertEquals(Byte.valueOf((byte) 1), rowValues[8]); + assertEquals(Short.valueOf((short) 1), rowValues[9]); + assertEquals(Float.valueOf(1.0f), rowValues[10]); + assertEquals(Long.valueOf(1l), rowValues[11]); + + c13Value = (List<?>) rowValues[12]; + assertEquals(2, c13Value.size()); + List<?> listVal = (List<?>) c13Value.get(0); + assertEquals("a", listVal.get(0)); + assertEquals("b", listVal.get(1)); + listVal = (List<?>) c13Value.get(1); + assertEquals("c", listVal.get(0)); + assertEquals("d", listVal.get(1)); + + //c14Value = (Map<?,?>) rowValues[13]; + //assertEquals(2, c14Value.size()); + //Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1)); + //assertEquals(2, mapVal.size()); + //assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11))); + //assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13))); + //mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2)); + //assertEquals(1, mapVal.size()); + //assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21))); + + c15Value = (List<?>) rowValues[14]; + assertEquals(Integer.valueOf(1), c15Value.get(0)); + listVal = (List<?>) c15Value.get(1); + assertEquals(2, listVal.size()); + assertEquals(Integer.valueOf(2), listVal.get(0)); + assertEquals("x", listVal.get(1)); + + //c16Value = (List<?>) rowValues[15]; + //assertEquals(2, c16Value.size()); + //listVal = (List<?>) c16Value.get(0); + //assertEquals(2, listVal.size()); + //mapVal = (Map<?,?>) listVal.get(0); + //assertEquals(0, mapVal.size()); + //assertEquals(Integer.valueOf(1), listVal.get(1)); + //listVal = (List<?>) c16Value.get(1); + //mapVal = (Map<?,?>) listVal.get(0); + //assertEquals(2, mapVal.size()); + //assertEquals("b", mapVal.get("a")); + //assertEquals("d", mapVal.get("c")); + //assertEquals(Integer.valueOf(2), listVal.get(1)); + + assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]); + assertEquals(new BigDecimal("123456789.123456"), rowValues[17]); + assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]); + assertEquals(Date.valueOf("2013-01-01"), rowValues[19]); + assertEquals("abc123", rowValues[20]); + assertEquals("abc123 ", rowValues[21]); + assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]); + } + +} + http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java new file mode 100644 index 0000000..809068f --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.hive.llap.LlapRowInputFormat; +import org.junit.BeforeClass; +import org.junit.Before; +import org.junit.After; +import org.apache.hadoop.mapred.InputFormat; + +/** + * TestJdbcWithMiniLlap for llap Row format. + */ +public class TestJdbcWithMiniLlapRow extends BaseJdbcWithMiniLlap { + + @BeforeClass + public static void beforeTest() throws Exception { + BaseJdbcWithMiniLlap.beforeTest(false); + } + + @Override + protected InputFormat<NullWritable, Row> getInputFormat() { + return new LlapRowInputFormat(); + } + +} + http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index a9ed3d2..5316aa7 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -22,25 +22,15 @@ import com.google.common.base.Preconditions; import java.io.BufferedInputStream; import java.io.Closeable; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.DataInputStream; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.Schema; import org.apache.hadoop.hive.llap.io.ChunkedInputStream; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.JobConf; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,52 +139,61 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor throw new IOException("Hit end of input, but did not find expected end of data indicator"); } - // There should be a reader event available, or coming soon, so okay to be blocking call. - ReaderEvent event = getReaderEvent(); - switch (event.getEventType()) { - case DONE: - break; - default: - throw new IOException("Expected reader event with done status, but got " - + event.getEventType() + " with message " + event.getMessage()); - } + processReaderEvent(); return false; } } catch (IOException io) { - try { - if (Thread.interrupted()) { - // Either we were interrupted by one of: - // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue - // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. - // Either way we should not try to block trying to read the reader events queue. - if (readerEvents.isEmpty()) { - // Case 2. - throw io; - } else { - // Case 1. Fail the reader, sending back the error we received from the reader event. - ReaderEvent event = getReaderEvent(); - switch (event.getEventType()) { - case ERROR: - throw new IOException("Received reader event error: " + event.getMessage(), io); - default: - throw new IOException("Got reader event type " + event.getEventType() - + ", expected error event", io); - } - } - } else { - // If we weren't interrupted, just propagate the error + failOnInterruption(io); + return false; + } + } + + protected void processReaderEvent() throws IOException { + // There should be a reader event available, or coming soon, so okay to be blocking call. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case DONE: + break; + default: + throw new IOException("Expected reader event with done status, but got " + + event.getEventType() + " with message " + event.getMessage()); + } + } + + protected void failOnInterruption(IOException io) throws IOException { + try { + if (Thread.interrupted()) { + // Either we were interrupted by one of: + // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue + // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. + // Either way we should not try to block trying to read the reader events queue. + if (readerEvents.isEmpty()) { + // Case 2. throw io; + } else { + // Case 1. Fail the reader, sending back the error we received from the reader event. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case ERROR: + throw new IOException("Received reader event error: " + event.getMessage(), io); + default: + throw new IOException("Got reader event type " + event.getEventType() + + ", expected error event", io); + } } - } finally { - // The external client handling umbilical responses and the connection to read the incoming - // data are not coupled. Calling close() here to make sure an error in one will cause the - // other to be closed as well. - try { - close(); - } catch (Exception err) { - // Don't propagate errors from close() since this will lose the original error above. - LOG.error("Closing RecordReader due to error and hit another error during close()", err); - } + } else { + // If we weren't interrupted, just propagate the error + throw io; + } + } finally { + // The external client handling umbilical responses and the connection to read the incoming + // data are not coupled. Calling close() here to make sure an error in one will cause the + // other to be closed as well. + try { + close(); + } catch (Exception err) { + // Don't propagate errors from close() since this will lose the original error above. + LOG.error("Closing RecordReader due to error and hit another error during close()", err); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java index 1cfbf3a..6cc1d17 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -70,20 +69,20 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> { private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class); protected final Configuration conf; - protected final RecordReader<NullWritable, BytesWritable> reader; + protected final RecordReader reader; protected final Schema schema; protected final AbstractSerDe serde; - protected final BytesWritable data; + protected final Writable data; public LlapRowRecordReader(Configuration conf, Schema schema, - RecordReader<NullWritable, BytesWritable> reader) throws IOException { + RecordReader<NullWritable, ? extends Writable> reader) throws IOException { this.conf = conf; this.schema = schema; this.reader = reader; - this.data = new BytesWritable(); + this.data = reader.createValue(); try { - serde = initSerDe(conf); + this.serde = initSerDe(conf); } catch (SerDeException err) { throw new IOException(err); } @@ -118,7 +117,7 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> { public boolean next(NullWritable key, Row value) throws IOException { Preconditions.checkArgument(value != null); - boolean hasNext = reader.next(key, data); + boolean hasNext = reader.next(key, data); if (hasNext) { // Deserialize data to column values, and populate the row record Object rowObj; @@ -216,7 +215,7 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> { return convertedVal; } - static void setRowFromStruct(Row row, Object structVal, StructObjectInspector soi) { + protected static void setRowFromStruct(Row row, Object structVal, StructObjectInspector soi) { Schema structSchema = row.getSchema(); // Add struct field data to the Row List<? extends StructField> structFields = soi.getAllStructFieldRefs(); @@ -230,6 +229,11 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> { } } + //Factory method for serDe + protected AbstractSerDe createSerDe() throws SerDeException { + return new LazyBinarySerDe(); + } + protected AbstractSerDe initSerDe(Configuration conf) throws SerDeException { Properties props = new Properties(); StringBuilder columnsBuffer = new StringBuilder(); @@ -249,9 +253,9 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> { props.put(serdeConstants.LIST_COLUMNS, columns); props.put(serdeConstants.LIST_COLUMN_TYPES, types); props.put(serdeConstants.ESCAPE_CHAR, "\\"); - AbstractSerDe serde = new LazyBinarySerDe(); - serde.initialize(conf, props); + AbstractSerDe createdSerDe = createSerDe(); + createdSerDe.initialize(conf, props); - return serde; + return createdSerDe; } } http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-ext-client/pom.xml ---------------------------------------------------------------------- diff --git a/llap-ext-client/pom.xml b/llap-ext-client/pom.xml index ed4704b..295d3e6 100644 --- a/llap-ext-client/pom.xml +++ b/llap-ext-client/pom.xml @@ -41,6 +41,11 @@ </dependency> <dependency> <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> <artifactId>hive-llap-client</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java new file mode 100644 index 0000000..d9c5666 --- /dev/null +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import com.google.common.base.Preconditions; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; + +/* + * Read from Arrow stream batch-by-batch + */ +public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrapperWritable> { + + private BufferAllocator allocator; + private ArrowStreamReader arrowStreamReader; + + public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz, + JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { + super(in, schema, clazz, job, client, socket); + allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit); + this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator); + } + + @Override + public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOException { + try { + // Need a way to know what thread to interrupt, since this is a blocking thread. + setReaderThread(Thread.currentThread()); + + boolean hasInput = arrowStreamReader.loadNextBatch(); + if (hasInput) { + VectorSchemaRoot vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot(); + //There must be at least one column vector + Preconditions.checkState(vectorSchemaRoot.getFieldVectors().size() > 0); + if(vectorSchemaRoot.getFieldVectors().get(0).getValueCount() == 0) { + //An empty batch will appear at the end of the stream + return false; + } + value.setVectorSchemaRoot(arrowStreamReader.getVectorSchemaRoot()); + return true; + } else { + processReaderEvent(); + return false; + } + } catch (IOException io) { + failOnInterruption(io); + return false; + } + } + + @Override + public void close() throws IOException { + arrowStreamReader.close(); + } + +} + http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java new file mode 100644 index 0000000..fafbdee --- /dev/null +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import java.io.IOException; + +/* + * Adapts an Arrow batch reader to a row reader + */ +public class LlapArrowRowInputFormat implements InputFormat<NullWritable, Row> { + + private LlapBaseInputFormat baseInputFormat; + + public LlapArrowRowInputFormat(long arrowAllocatorLimit) { + baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + return baseInputFormat.getSplits(job, numSplits); + } + + @Override + public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter) + throws IOException { + LlapInputSplit llapSplit = (LlapInputSplit) split; + LlapArrowBatchRecordReader reader = + (LlapArrowBatchRecordReader) baseInputFormat.getRecordReader(llapSplit, job, reporter); + return new LlapArrowRowRecordReader(job, reader.getSchema(), reader); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java new file mode 100644 index 0000000..d4179d5 --- /dev/null +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import com.google.common.base.Preconditions; +import org.apache.arrow.vector.FieldVector; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * Buffers a batch for reading one row at a time. + */ +public class LlapArrowRowRecordReader extends LlapRowRecordReader { + + private static final Logger LOG = LoggerFactory.getLogger(LlapArrowRowRecordReader.class); + private int rowIndex = 0; + private int batchSize = 0; + + //Buffer one batch at a time, for row retrieval + private Object[][] currentBatch; + + public LlapArrowRowRecordReader(Configuration conf, Schema schema, + RecordReader<NullWritable, ? extends Writable> reader) throws IOException { + super(conf, schema, reader); + } + + @Override + public boolean next(NullWritable key, Row value) throws IOException { + Preconditions.checkArgument(value != null); + boolean hasNext = false; + ArrowWrapperWritable batchData = (ArrowWrapperWritable) data; + if((batchSize == 0) || (rowIndex == batchSize)) { + //This is either the first batch or we've used up the current batch buffer + batchSize = 0; + rowIndex = 0; + hasNext = reader.next(key, data); + if(hasNext) { + //There is another batch to buffer + try { + List<FieldVector> vectors = batchData.getVectorSchemaRoot().getFieldVectors(); + //hasNext implies there is some column in the batch + Preconditions.checkState(vectors.size() > 0); + //All the vectors have the same length, + //we can get the number of rows from the first vector + batchSize = vectors.get(0).getValueCount(); + ArrowWrapperWritable wrapper = new ArrowWrapperWritable(batchData.getVectorSchemaRoot()); + currentBatch = (Object[][]) serde.deserialize(wrapper); + StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector(); + setRowFromStruct(value, currentBatch[rowIndex], rowOI); + } catch (Exception e) { + LOG.error("Failed to fetch Arrow batch", e); + throw new RuntimeException(e); + } + } + //There were no more batches AND + //this is either the first batch or we've used up the current batch buffer. + //goto return false + } else if(rowIndex < batchSize) { + //Take a row from the current buffered batch + hasNext = true; + StructObjectInspector rowOI = null; + try { + rowOI = (StructObjectInspector) serde.getObjectInspector(); + } catch (SerDeException e) { + throw new RuntimeException(e); + } + setRowFromStruct(value, currentBatch[rowIndex], rowOI); + } + //Always inc the batch buffer index + //If we return false, it is just a noop + rowIndex++; + return hasNext; + } + + protected AbstractSerDe createSerDe() throws SerDeException { + return new ArrowColumnarBatchSerDe(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index f4c7fa4..ef03be6 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -49,15 +49,15 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrB import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient; import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder; import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; -import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; import org.apache.hadoop.hive.registry.ServiceInstanceSet; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -104,6 +104,8 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> private String user; // "hive", private String pwd; // "" private String query; + private boolean useArrow; + private long arrowAllocatorLimit; private final Random rand = new Random(); public static final String URL_KEY = "llap.if.hs2.connection"; @@ -123,7 +125,14 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> this.query = query; } - public LlapBaseInputFormat() {} + public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) { + this.useArrow = useArrow; + this.arrowAllocatorLimit = arrowAllocatorLimit; + } + + public LlapBaseInputFormat() { + this.useArrow = false; + } @SuppressWarnings("unchecked") @@ -195,8 +204,16 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> LOG.info("Registered id: " + fragmentId); @SuppressWarnings("rawtypes") - LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), - llapSplit.getSchema(), Text.class, job, llapClient, (java.io.Closeable)socket); + LlapBaseRecordReader recordReader; + if(useArrow) { + recordReader = new LlapArrowBatchRecordReader( + socket.getInputStream(), llapSplit.getSchema(), + ArrowWrapperWritable.class, job, llapClient, socket, + arrowAllocatorLimit); + } else { + recordReader = new LlapBaseRecordReader(socket.getInputStream(), + llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket); + } umbilicalResponder.setRecordReader(recordReader); return recordReader; } http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a963f8d..77074df 100644 --- a/pom.xml +++ b/pom.xml @@ -119,6 +119,7 @@ <antlr.version>3.5.2</antlr.version> <apache-directory-server.version>1.5.6</apache-directory-server.version> <apache-directory-clientapi.version>0.1</apache-directory-clientapi.version> + <!-- Include arrow for LlapOutputFormatService --> <arrow.version>0.8.0</arrow.version> <avatica.version>1.11.0</avatica.version> <avro.version>1.7.7</avro.version>
