This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new f6aa916a171 HIVE-27765: Backport of HIVE-20052, HIVE-20093,
HIVE-20203, HIVE-20290, HIVE-20300, HIVE-20312, HIVE-20044, HIVE-21966 to
branch-3(#4772)
f6aa916a171 is described below
commit f6aa916a17109d4f64e7dd878b49912b79dd8d75
Author: Aman Raj <[email protected]>
AuthorDate: Tue Oct 10 10:41:32 2023 +0530
HIVE-27765: Backport of HIVE-20052, HIVE-20093, HIVE-20203, HIVE-20290,
HIVE-20300, HIVE-20312, HIVE-20044, HIVE-21966 to branch-3(#4772)
* HIVE-20052: Arrow serde should fill ArrowColumnVector(Decimal) with the
given schema precision/scale
* HIVE-20093: LlapOutputFomatService: Use ArrowBuf with Netty for Accounting
* HIVE-20203: Arrow SerDe leaks a DirectByteBuffer
* HIVE-20290: Lazy initialize ArrowColumnarBatchSerDe so it doesn't
allocate buffers during GetSplits
* HIVE-20300: VectorFileSinkArrowOperator
* HIVE-20312: Allow arrow clients to use their own BufferAllocator with
LlapOutputFormatService
* HIVE-20044: Arrow Serde should pad char values and handle empty strings
correctly
* HIVE-21966: Llap external client - Arrow Serializer throws
ArrayIndexOutOfBoundsException in some cases
---------
Co-authored-by: Eric Wohlstadter <[email protected]>
Co-authored-by: Nikhil Gupta <[email protected]>
Co-authored-by: Teddy Choi <[email protected]>
Co-authored-by: Shubham Chaurasia <[email protected]>
Signed-off-by: Sankar Hariappan <[email protected]>
Closes (#4772)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 8 +-
.../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 45 +-
.../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 7 +-
.../apache/hive/jdbc/TestJdbcWithMiniLlapRow.java | 6 +-
...w.java => TestJdbcWithMiniLlapVectorArrow.java} | 297 ++++---
.../hive/llap/LlapArrowBatchRecordReader.java | 15 +-
.../hadoop/hive/llap/LlapArrowRowInputFormat.java | 14 +-
.../hadoop/hive/llap/LlapBaseInputFormat.java | 25 +-
.../hadoop/hive/llap/LlapArrowRecordWriter.java | 25 +-
.../hive/llap/WritableByteChannelAdapter.java | 12 +-
.../filesink/VectorFileSinkArrowOperator.java | 180 +++++
.../hive/ql/io/arrow/ArrowColumnarBatchSerDe.java | 20 +-
.../hive/ql/io/arrow/ArrowWrapperWritable.java | 19 +
.../apache/hadoop/hive/ql/io/arrow/Serializer.java | 865 +++++++++++++++------
.../hive/ql/optimizer/physical/Vectorizer.java | 60 +-
.../ql/io/arrow/TestArrowColumnarBatchSerDe.java | 53 ++
.../ql/exec/vector/expressions/StringExpr.java | 15 +
17 files changed, 1268 insertions(+), 398 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3ec99315a27..bf20a78b588 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2682,6 +2682,8 @@ public class HiveConf extends Configuration {
// For Arrow SerDe
HIVE_ARROW_ROOT_ALLOCATOR_LIMIT("hive.arrow.root.allocator.limit",
Long.MAX_VALUE,
"Arrow root allocator memory size limitation in bytes."),
+ HIVE_ARROW_BATCH_ALLOCATOR_LIMIT("hive.arrow.batch.allocator.limit",
10_000_000_000L,
+ "Max bytes per arrow batch. This is a threshold, the memory is not
pre-allocated."),
HIVE_ARROW_BATCH_SIZE("hive.arrow.batch.size", 1000, "The number of rows
sent in one Arrow batch."),
// For Druid storage handler
@@ -3690,7 +3692,11 @@ public class HiveConf extends Configuration {
"internal use only. When false, don't suppress fatal exceptions
like\n" +
"NullPointerException, etc so the query will fail and assure it will
be noticed",
true),
-
+ HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED(
+ "hive.vectorized.execution.filesink.arrow.native.enabled", true,
+ "This flag should be set to true to enable the native vectorization\n"
+
+ "of queries using the Arrow SerDe and FileSink.\n" +
+ "The default value is true."),
HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true, "This property
has been extended to control "
+ "whether to check, convert, and normalize partition value to conform
to its column type in "
+ "partition operations including but not limited to insert, such as
alter, describe etc."),
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 5cf765d8eb8..fbcd229d224 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -102,38 +102,20 @@ import org.apache.hadoop.mapred.InputFormat;
* by sub-classes in a {@link org.junit.BeforeClass} initializer
*/
public abstract class BaseJdbcWithMiniLlap {
- private static MiniHS2 miniHS2 = null;
+
private static String dataFileDir;
private static Path kvDataFilePath;
private static Path dataTypesFilePath;
- private static HiveConf conf = null;
- private static Connection hs2Conn = null;
+ protected static MiniHS2 miniHS2 = null;
+ protected static HiveConf conf = null;
+ protected static Connection hs2Conn = null;
// This method should be called by sub-classes in a @BeforeClass initializer
- public static MiniHS2 beforeTest(boolean useArrow) throws Exception {
+ public static MiniHS2 beforeTest(HiveConf inputConf) throws Exception {
+ conf = inputConf;
Class.forName(MiniHS2.getJdbcDriverName());
-
- String confDir = "../../data/conf/llap/";
- if (confDir != null && !confDir.isEmpty()) {
- HiveConf.setHiveSiteLocation(new URL("file://"+ new
File(confDir).toURI().getPath() + "/hive-site.xml"));
- System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
- }
-
- conf = new HiveConf();
- conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
- if(useArrow) {
- conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
- } else {
- conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
- }
-
- conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
- + "/tez-site.xml"));
-
miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
-
dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:",
"");
kvDataFilePath = new Path(dataFileDir, "kv1.txt");
dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
@@ -143,6 +125,19 @@ public abstract class BaseJdbcWithMiniLlap {
return miniHS2;
}
+ static HiveConf defaultConf() throws Exception {
+ String confDir = "../../data/conf/llap/";
+ if (confDir != null && !confDir.isEmpty()) {
+ HiveConf.setHiveSiteLocation(new URL("file://"+ new
File(confDir).toURI().getPath() + "/hive-site.xml"));
+ System.out.println("Setting hive-site: " +
HiveConf.getHiveSiteLocation());
+ }
+ HiveConf defaultConf = new HiveConf();
+ defaultConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ defaultConf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ defaultConf.addResource(new URL("file://" + new
File(confDir).toURI().getPath() + "/tez-site.xml"));
+ return defaultConf;
+ }
+
@Before
public void setUp() throws Exception {
hs2Conn = getConnection(miniHS2.getJdbcURL(),
System.getProperty("user.name"), "bar");
@@ -549,6 +544,8 @@ public abstract class BaseJdbcWithMiniLlap {
rowProcessor.process(row);
++rowCount;
}
+ //In arrow-mode this will throw exception unless all buffers have been
released
+ //See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader
reader.close();
}
LlapBaseInputFormat.close(handleId);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index 0c6acd8495c..3dcc4928b1a 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hive.llap.Row;
import org.apache.hadoop.io.NullWritable;
import org.junit.BeforeClass;
import org.junit.Ignore;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.junit.AfterClass;
import org.junit.Test;
@@ -62,9 +64,10 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
@BeforeClass
public static void beforeTest() throws Exception {
- HiveConf conf = new HiveConf();
+ HiveConf conf = defaultConf();
+ conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
MiniHS2.cleanupLocalDir();
- miniHS2 = BaseJdbcWithMiniLlap.beforeTest(true);
+ miniHS2 = BaseJdbcWithMiniLlap.beforeTest(conf);
dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:",
"");
Connection conDefault =
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
index 809068fe3e7..d954d0e2fe6 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
@@ -25,6 +25,8 @@ import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.After;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
/**
* TestJdbcWithMiniLlap for llap Row format.
@@ -33,7 +35,9 @@ public class TestJdbcWithMiniLlapRow extends
BaseJdbcWithMiniLlap {
@BeforeClass
public static void beforeTest() throws Exception {
- BaseJdbcWithMiniLlap.beforeTest(false);
+ HiveConf conf = defaultConf();
+ conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
+ BaseJdbcWithMiniLlap.beforeTest(conf);
}
@Override
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
similarity index 54%
copy from
itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
copy to
itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
index 0c6acd8495c..35eda6cb0a6 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
@@ -18,69 +18,42 @@
package org.apache.hive.jdbc;
+import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
+
import java.math.BigDecimal;
+
+import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.common.type.Timestamp;
+
+import java.sql.Statement;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.llap.FieldDesc;
import org.apache.hadoop.hive.llap.Row;
import org.apache.hadoop.io.NullWritable;
import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.junit.AfterClass;
-import org.junit.Test;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Connection;
-import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
-import org.apache.hive.jdbc.miniHS2.MiniHS2;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.junit.Test;
/**
- * TestJdbcWithMiniLlap for Arrow format
+ * TestJdbcWithMiniLlap for Arrow format with vectorized output sink
*/
-@Ignore("unstable HIVE-23549")
-public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
+public class TestJdbcWithMiniLlapVectorArrow extends BaseJdbcWithMiniLlap {
- private static MiniHS2 miniHS2 = null;
- private static final String tableName = "testJdbcMinihs2Tbl";
- private static String dataFileDir;
- private static final String testDbName = "testJdbcMinihs2";
-
- private static class ExceptionHolder {
- Throwable throwable;
- }
@BeforeClass
public static void beforeTest() throws Exception {
- HiveConf conf = new HiveConf();
- MiniHS2.cleanupLocalDir();
- miniHS2 = BaseJdbcWithMiniLlap.beforeTest(true);
- dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:",
"");
-
- Connection conDefault =
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
- System.getProperty("user.name"), "bar");
- Statement stmt = conDefault.createStatement();
- stmt.execute("drop database if exists " + testDbName + " cascade");
- stmt.execute("create database " + testDbName);
- stmt.close();
- conDefault.close();
- }
-
- @AfterClass
- public static void afterTest() {
- if (miniHS2 != null && miniHS2.isStarted()) {
- miniHS2.stop();
- }
+ HiveConf conf = defaultConf();
+ conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+ conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED,
true);
+ BaseJdbcWithMiniLlap.beforeTest(conf);
}
@Override
@@ -266,94 +239,172 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
}
- /**
- * SleepMsUDF
- */
- public static class SleepMsUDF extends UDF {
- public Integer evaluate(int value, int ms) {
- try {
- Thread.sleep(ms);
- } catch (InterruptedException e) {
- // No-op
- }
- return value;
+
+ @Test
+ public void testTypesNestedInListWithLimitAndFilters() throws Exception {
+ try (Statement statement = hs2Conn.createStatement()) {
+ statement.execute("CREATE TABLE complex_tbl(c1 array<string>, " +
+ "c2 array<struct<f1:string,f2:string>>, " +
+ "c3 array<array<struct<f1:string,f2:string>>>, " +
+ "c4 int) STORED AS ORC");
+
+ statement.executeUpdate("INSERT INTO complex_tbl VALUES " +
+ "(" +
+ "ARRAY('a1', 'a2', 'a3', null), " +
+ "ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'), NAMED_STRUCT('f1','a3',
'f2','a4')), " +
+ "ARRAY((ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'),
NAMED_STRUCT('f1','a3', 'f2','a4')))), " +
+ "1), " +
+ "(" +
+ "ARRAY('b1'), " +
+ "ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'), NAMED_STRUCT('f1','b3',
'f2','b4')), " +
+ "ARRAY((ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'),
NAMED_STRUCT('f1','b3', 'f2','b4'))), " +
+ "(ARRAY(NAMED_STRUCT('f1','b5', 'f2','b6'), NAMED_STRUCT('f1','b7',
'f2','b8')))), " +
+ "2), " +
+ "(" +
+ "ARRAY('c1', 'c2'), ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'),
NAMED_STRUCT('f1','c3', 'f2','c4'), " +
+ "NAMED_STRUCT('f1','c5', 'f2','c6')),
ARRAY((ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'), " +
+ "NAMED_STRUCT('f1','c3', 'f2','c4'))),
(ARRAY(NAMED_STRUCT('f1','c5', 'f2','c6'), " +
+ "NAMED_STRUCT('f1','c7', 'f2','c8'))),
(ARRAY(NAMED_STRUCT('f1','c9', 'f2','c10'), " +
+ "NAMED_STRUCT('f1','c11', 'f2','c12')))), " +
+ "3), " +
+ "(" +
+ "ARRAY(null), " +
+ "ARRAY(NAMED_STRUCT('f1','d1', 'f2','d2'), NAMED_STRUCT('f1','d3',
'f2','d4'), " +
+ "NAMED_STRUCT('f1','d5', 'f2','d6'), NAMED_STRUCT('f1','d7',
'f2','d8')), " +
+ "ARRAY((ARRAY(NAMED_STRUCT('f1','d1', 'f2', 'd2')))), " +
+ "4)");
+
}
+
+ List<Object[]> expected = new ArrayList<>();
+ expected.add(new Object[]{
+ asList("a1", "a2", "a3", null),
+ asList(asList("a1", "a2"), asList("a3", "a4")),
+ asList(asList(asList("a1", "a2"), asList("a3", "a4"))),
+ 1
+ });
+ expected.add(new Object[]{
+ asList("b1"),
+ asList(asList("b1", "b2"), asList("b3", "b4")),
+ asList(asList(asList("b1", "b2"), asList("b3", "b4")),
asList(asList("b5", "b6"), asList("b7", "b8"))),
+ 2
+ });
+ expected.add(new Object[]{
+ asList("c1", "c2"),
+ asList(asList("c1", "c2"), asList("c3", "c4"), asList("c5", "c6")),
+ asList(asList(asList("c1", "c2"), asList("c3", "c4")),
asList(asList("c5", "c6"), asList("c7", "c8")),
+ asList(asList("c9", "c10"), asList("c11", "c12"))),
+ 3
+ });
+ List<String> nullList = new ArrayList<>();
+ nullList.add(null);
+ expected.add(new Object[]{
+ nullList,
+ asList(asList("d1", "d2"), asList("d3", "d4"), asList("d5", "d6"),
asList("d7", "d8")),
+ asList(asList(asList("d1", "d2"))),
+ 4
+ });
+
+ // test without limit and filters (i.e
VectorizedRowBatch#selectedInUse=false)
+ RowCollector2 rowCollector = new RowCollector2();
+ String query = "select * from complex_tbl";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(0),
+ expected.get(1),
+ expected.get(2),
+ expected.get(3));
+
+ // test with filter
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl where c4 > 1 ";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(1), expected.get(2),
expected.get(3));
+
+ // test with limit
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl limit 3";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(0), expected.get(1),
expected.get(2));
+
+ // test with filters and limit
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl where c4 > 1 limit 2";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(1), expected.get(2));
+
}
- /**
- * Test CLI kill command of a query that is running.
- * We spawn 2 threads - one running the query and
- * the other attempting to cancel.
- * We're using a dummy udf to simulate a query,
- * that runs for a sufficiently long time.
- * @throws Exception
- */
@Test
- public void testKillQuery() throws Exception {
- Connection con =
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
- System.getProperty("user.name"), "bar");
- Connection con2 =
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
- System.getProperty("user.name"), "bar");
-
- String udfName = SleepMsUDF.class.getName();
- Statement stmt1 = con.createStatement();
- final Statement stmt2 = con2.createStatement();
- Path dataFilePath = new Path(dataFileDir, "kv1.txt");
-
- String tblName = testDbName + "." + tableName;
-
- stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
- stmt1.execute("create table " + tblName + " (int_col int, value string) ");
- stmt1.execute("load data local inpath '" + dataFilePath.toString() + "'
into table " + tblName);
-
-
- stmt1.close();
- final Statement stmt = con.createStatement();
- final ExceptionHolder tExecuteHolder = new ExceptionHolder();
- final ExceptionHolder tKillHolder = new ExceptionHolder();
-
- // Thread executing the query
- Thread tExecute = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- System.out.println("Executing query: ");
- stmt.execute("set hive.llap.execution.mode = none");
-
- // The test table has 500 rows, so total query time should be ~
500*500ms
- stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col,
t2.int_col " +
- "from " + tableName + " t1 join " + tableName + " t2 on
t1.int_col = t2.int_col");
- } catch (SQLException e) {
- tExecuteHolder.throwable = e;
- }
- }
+ public void testTypesNestedInMapWithLimitAndFilters() throws Exception {
+ try (Statement statement = hs2Conn.createStatement()) {
+ statement.execute("CREATE TABLE complex_tbl2(c1 map<int, string>," +
+ " c2 map<int, array<string>>, " +
+ " c3 map<int, struct<f1:string,f2:string>>, c4 int) STORED AS ORC");
+
+ statement.executeUpdate("INSERT INTO complex_tbl2 VALUES " +
+ "(MAP(1, 'a1'), MAP(1, ARRAY('a1', 'a2')), MAP(1,
NAMED_STRUCT('f1','a1', 'f2','a2')), " +
+ "1), " +
+ "(MAP(1, 'b1',2, 'b2'), MAP(1, ARRAY('b1', 'b2'), 2, ARRAY('b3') ),
" +
+ "MAP(1, NAMED_STRUCT('f1','b1', 'f2','b2')), " +
+ "2), " +
+ "(MAP(1, 'c1',2, 'c2'), MAP(1, ARRAY('c1', 'c2'), 2, ARRAY('c3') ),
" +
+ "MAP(1, NAMED_STRUCT('f1','c1', 'f2','c2'), 2, NAMED_STRUCT('f1',
'c3', 'f2', 'c4') ), " +
+ "3)");
+
+ }
+
+ List<Object[]> expected = new ArrayList<>();
+ expected.add(new Object[]{
+ ImmutableMap.of(1, "a1"),
+ ImmutableMap.of(1, asList("a1", "a2")),
+ ImmutableMap.of(1, asList("a1", "a2")),
+ 1,
});
- // Thread killing the query
- Thread tKill = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(5000);
- String queryId = ((HiveStatement) stmt).getQueryId();
- System.out.println("Killing query: " + queryId);
- stmt2.execute("kill query '" + queryId + "'");
- stmt2.close();
- } catch (Exception e) {
- tKillHolder.throwable = e;
- }
- }
+ expected.add(new Object[]{
+ ImmutableMap.of(1, "b1", 2, "b2"),
+ ImmutableMap.of(1, asList("b1", "b2"), 2, asList("b3")),
+ ImmutableMap.of(1, asList("b1", "b2")),
+ 2,
+ });
+ expected.add(new Object[]{
+ ImmutableMap.of(1, "c1", 2, "c2"),
+ ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3")),
+ ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3", "c4")),
+ 3,
});
- tExecute.start();
- tKill.start();
- tExecute.join();
- tKill.join();
- stmt.close();
- con2.close();
- con.close();
- assertNotNull("tExecute", tExecuteHolder.throwable);
- assertNull("tCancel", tKillHolder.throwable);
+ // test without limit and filters (i.e.
VectorizedRowBatch#selectedInUse=false)
+ RowCollector2 rowCollector = new RowCollector2();
+ String query = "select * from complex_tbl2";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(0), expected.get(1),
expected.get(2));
+
+ // test with filter
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl2 where c4 > 1 ";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(1), expected.get(2));
+
+ // test with limit
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl2 limit 2";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(0), expected.get(1));
+
+ // test with filters and limit
+ rowCollector = new RowCollector2();
+ query = "select * from complex_tbl2 where c4 > 1 limit 1";
+ processQuery(query, 1, rowCollector);
+ verifyResult(rowCollector.rows, expected.get(1));
+
+ }
+
+ private void verifyResult(List<Object[]> actual, Object[]... expected) {
+ assertEquals(expected.length, actual.size());
+ for (int i = 0; i < expected.length; i++) {
+ assertArrayEquals(expected[i], actual.get(i));
+ }
}
}
diff --git
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
index d9c5666bc40..014e49dafef 100644
---
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
+++
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
@@ -39,11 +39,19 @@ public class LlapArrowBatchRecordReader extends
LlapBaseRecordReader<ArrowWrappe
private BufferAllocator allocator;
private ArrowStreamReader arrowStreamReader;
+ //Allows client to provide and manage their own arrow BufferAllocator
public LlapArrowBatchRecordReader(InputStream in, Schema schema,
Class<ArrowWrapperWritable> clazz,
- JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit)
throws IOException {
+ JobConf job, Closeable client, Socket socket, BufferAllocator allocator)
throws IOException {
super(in, schema, clazz, job, client, socket);
- allocator =
RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit);
+ this.allocator = allocator;
this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(),
allocator);
+ }
+
+ //Use the global arrow BufferAllocator
+ public LlapArrowBatchRecordReader(InputStream in, Schema schema,
Class<ArrowWrapperWritable> clazz,
+ JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit)
throws IOException {
+ this(in, schema, clazz, job, client, socket,
+
RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit));
}
@Override
@@ -76,6 +84,9 @@ public class LlapArrowBatchRecordReader extends
LlapBaseRecordReader<ArrowWrappe
@Override
public void close() throws IOException {
arrowStreamReader.close();
+ //allocator.close() will throw exception unless all buffers have been
released
+ //See org.apache.arrow.memory.BaseAllocator.close()
+ allocator.close();
}
}
diff --git
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
index fafbdee210a..7690599a80c 100644
---
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
+++
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
@@ -25,16 +25,28 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory;
+import java.util.UUID;
/*
* Adapts an Arrow batch reader to a row reader
+ * Only used for testing
*/
public class LlapArrowRowInputFormat implements InputFormat<NullWritable, Row>
{
private LlapBaseInputFormat baseInputFormat;
public LlapArrowRowInputFormat(long arrowAllocatorLimit) {
- baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit);
+ BufferAllocator allocator =
RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit).newChildAllocator(
+ //allocator name, use UUID for testing
+ UUID.randomUUID().toString(),
+ //No use for reservation, allocators claim memory from the same pool,
+ //but allocate/releases are tracked per-allocator
+ 0,
+ //Limit passed in by client
+ arrowAllocatorLimit);
+ baseInputFormat = new LlapBaseInputFormat(true, allocator);
}
@Override
diff --git
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 30f372003f0..46e6e024ba2 100644
---
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.arrow.memory.BufferAllocator;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.Credentials;
@@ -107,6 +108,7 @@ public class LlapBaseInputFormat<V extends
WritableComparable<?>>
private String query;
private boolean useArrow;
private long arrowAllocatorLimit;
+ private BufferAllocator allocator;
private final Random rand = new Random();
public static final String URL_KEY = "llap.if.hs2.connection";
@@ -128,11 +130,17 @@ public class LlapBaseInputFormat<V extends
WritableComparable<?>>
this.query = query;
}
+ //Exposed only for testing, clients should use LlapBaseInputFormat(boolean,
BufferAllocator instead)
public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) {
this.useArrow = useArrow;
this.arrowAllocatorLimit = arrowAllocatorLimit;
}
+ public LlapBaseInputFormat(boolean useArrow, BufferAllocator allocator) {
+ this.useArrow = useArrow;
+ this.allocator = allocator;
+ }
+
public LlapBaseInputFormat() {
this.useArrow = false;
}
@@ -209,10 +217,19 @@ public class LlapBaseInputFormat<V extends
WritableComparable<?>>
@SuppressWarnings("rawtypes")
LlapBaseRecordReader recordReader;
if(useArrow) {
- recordReader = new LlapArrowBatchRecordReader(
- socket.getInputStream(), llapSplit.getSchema(),
- ArrowWrapperWritable.class, job, llapClient, socket,
- arrowAllocatorLimit);
+ if(allocator != null) {
+ //Client provided their own allocator
+ recordReader = new LlapArrowBatchRecordReader(
+ socket.getInputStream(), llapSplit.getSchema(),
+ ArrowWrapperWritable.class, job, llapClient, socket,
+ allocator);
+ } else {
+ //Client did not provide their own allocator, use constructor for
global allocator
+ recordReader = new LlapArrowBatchRecordReader(
+ socket.getInputStream(), llapSplit.getSchema(),
+ ArrowWrapperWritable.class, job, llapClient, socket,
+ arrowAllocatorLimit);
+ }
} else {
recordReader = new LlapBaseRecordReader(socket.getInputStream(),
llapSplit.getSchema(), BytesWritable.class, job, llapClient,
(java.io.Closeable)socket);
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
index 1b3a3ebb269..4cd8a61c8f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.hive.llap;
import java.io.IOException;
+import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.NonNullableStructVector;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
import org.apache.hadoop.io.Writable;
-import java.nio.channels.WritableByteChannel;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.slf4j.Logger;
@@ -47,15 +48,28 @@ public class LlapArrowRecordWriter<K extends Writable, V
extends Writable>
public static final Logger LOG =
LoggerFactory.getLogger(LlapArrowRecordWriter.class);
ArrowStreamWriter arrowStreamWriter;
- WritableByteChannel out;
+ WritableByteChannelAdapter out;
+ BufferAllocator allocator;
+ NonNullableStructVector rootVector;
- public LlapArrowRecordWriter(WritableByteChannel out) {
+ public LlapArrowRecordWriter(WritableByteChannelAdapter out) {
this.out = out;
}
@Override
public void close(Reporter reporter) throws IOException {
- arrowStreamWriter.close();
+ try {
+ arrowStreamWriter.close();
+ } finally {
+ rootVector.close();
+ //bytesLeaked should always be 0
+ long bytesLeaked = allocator.getAllocatedMemory();
+ if(bytesLeaked != 0) {
+ LOG.error("Arrow memory leaked bytes: {}", bytesLeaked);
+ throw new IllegalStateException("Arrow memory leaked bytes:" +
bytesLeaked);
+ }
+ allocator.close();
+ }
}
@Override
@@ -64,6 +78,9 @@ public class LlapArrowRecordWriter<K extends Writable, V
extends Writable>
if (arrowStreamWriter == null) {
VectorSchemaRoot vectorSchemaRoot =
arrowWrapperWritable.getVectorSchemaRoot();
arrowStreamWriter = new ArrowStreamWriter(vectorSchemaRoot, null, out);
+ allocator = arrowWrapperWritable.getAllocator();
+ this.out.setAllocator(allocator);
+ rootVector = arrowWrapperWritable.getRootVector();
}
arrowStreamWriter.writeBatch();
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
index 57da1d9f6d8..b07ce5b07c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
@@ -18,13 +18,14 @@
package org.apache.hadoop.hive.llap;
-import io.netty.buffer.Unpooled;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.Semaphore;
+import org.apache.arrow.memory.BufferAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ public class WritableByteChannelAdapter implements
WritableByteChannel {
private final Semaphore writeResources;
private boolean closed = false;
private final String id;
+ private BufferAllocator allocator;
private ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
@@ -82,12 +84,18 @@ public class WritableByteChannelAdapter implements
WritableByteChannel {
this.id = id;
}
+ public void setAllocator(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
@Override
public int write(ByteBuffer src) throws IOException {
int size = src.remaining();
//Down the semaphore or block until available
takeWriteResources(1);
- chc.writeAndFlush(Unpooled.wrappedBuffer(src)).addListener(writeListener);
+ ByteBuf buf = allocator.buffer(size);
+ buf.writeBytes(src);
+ chc.writeAndFlush(buf).addListener(writeListener);
return size;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
new file mode 100644
index 00000000000..1603703ec7e
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.filesink;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.hive.llap.LlapOutputFormatService;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import java.util.List;
+import java.util.ArrayList;
+import org.apache.hadoop.hive.ql.io.arrow.Serializer;
+import static org.apache.hadoop.hive.llap.LlapOutputFormat.LLAP_OF_ID_KEY;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.core.layout.AbstractStringLayout;
+
+/**
+ * Native Vectorized File Sink operator implementation for Arrow.
+ * Assumes output to LlapOutputFormatService
+ **/
+public class VectorFileSinkArrowOperator extends TerminalOperator<FileSinkDesc>
+ implements Serializable, VectorizationOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private VectorizationContext vContext;
+ private VectorFileSinkDesc vectorDesc;
+ public static final Logger LOG =
LoggerFactory.getLogger(VectorFileSinkArrowOperator.class.getName());
+
+ // The above members are initialized by the constructor and must not be
+ // transient.
+ //---------------------------------------------------------------------------
+
+ private transient Serializer converter;
+ private transient RecordWriter recordWriter;
+ private transient boolean wroteData;
+ private transient String attemptId;
+
+ public VectorFileSinkArrowOperator(CompilationOpContext ctx, OperatorDesc
conf,
+ VectorizationContext vContext, VectorDesc vectorDesc) {
+ this(ctx);
+ this.conf = (FileSinkDesc) conf;
+ this.vContext = vContext;
+ this.vectorDesc = (VectorFileSinkDesc) vectorDesc;
+ }
+
+ /** Kryo ctor. */
+ @VisibleForTesting
+ public VectorFileSinkArrowOperator() {
+ super();
+ }
+
+ public VectorFileSinkArrowOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
+ @Override
+ public VectorizationContext getInputVectorizationContext() {
+ return vContext;
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+ //attemptId identifies a RecordWriter initialized by
LlapOutputFormatService
+ this.attemptId = hconf.get(LLAP_OF_ID_KEY);
+ try {
+ //Initialize column names and types
+ List<TypeInfo> typeInfos = new ArrayList<>();
+ List<String> fieldNames = new ArrayList<>();
+ StructObjectInspector schema = (StructObjectInspector)
inputObjInspectors[0];
+ for(int i = 0; i < schema.getAllStructFieldRefs().size(); i++) {
+ StructField structField = schema.getAllStructFieldRefs().get(i);
+ fieldNames.add(structField.getFieldName());
+ TypeInfo typeInfo =
TypeInfoUtils.getTypeInfoFromObjectInspector(structField.getFieldObjectInspector());
+ typeInfos.add(typeInfo);
+ }
+ //Initialize an Arrow serializer
+ converter = new Serializer(hconf, attemptId, typeInfos, fieldNames);
+ } catch (Exception e) {
+ LOG.error("Unable to initialize VectorFileSinkArrowOperator");
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void process(Object data, int tag) throws HiveException {
+ //ArrowStreamReader expects at least the schema metadata, if this op
writes no data,
+ //we need to send the schema to close the stream gracefully
+ VectorizedRowBatch batch = (VectorizedRowBatch) data;
+ try {
+ if(recordWriter == null) {
+ recordWriter = LlapOutputFormatService.get().getWriter(this.attemptId);
+ }
+ //Convert the VectorizedRowBatch to a handle for the Arrow batch
+ ArrowWrapperWritable writable = converter.serializeBatch(batch, true);
+ //Pass the handle to the LlapOutputFormatService recordWriter
+ recordWriter.write(null, writable);
+ this.wroteData = true;
+ } catch(Exception e) {
+ LOG.error("Failed to convert VectorizedRowBatch to Arrow batch");
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void closeOp(boolean abort) throws HiveException {
+ try {
+ if(!wroteData) {
+ //Send a schema only batch to signal EOS with no data written
+ ArrowWrapperWritable writable = converter.emptyBatch();
+ if(recordWriter == null) {
+ recordWriter =
LlapOutputFormatService.get().getWriter(this.attemptId);
+ }
+ recordWriter.write(null, writable);
+ }
+ } catch(Exception e) {
+ LOG.error("Failed to write Arrow stream schema");
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ //Close the recordWriter with null Reporter
+ recordWriter.close(null);
+ } catch(Exception e) {
+ LOG.error("Failed to close Arrow stream");
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public VectorDesc getVectorDesc() {
+ return vectorDesc;
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.FILESINK;
+ }
+}
+
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
index a1a3327814c..dac7d9c18a3 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
@@ -104,7 +104,6 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
public void initialize(Configuration conf, Properties tbl) throws
SerDeException {
this.conf = conf;
- rootAllocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf);
final String columnNameProperty =
tbl.getProperty(serdeConstants.LIST_COLUMNS);
final String columnTypeProperty =
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
@@ -134,8 +133,6 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
fields.add(toField(columnNames.get(i), columnTypes.get(i)));
}
- serializer = new Serializer(this);
- deserializer = new Deserializer(this);
}
private static Field toField(String name, TypeInfo typeInfo) {
@@ -257,6 +254,15 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe
{
@Override
public ArrowWrapperWritable serialize(Object obj, ObjectInspector
objInspector) {
+ if(serializer == null) {
+ try {
+ rootAllocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf);
+ serializer = new Serializer(this);
+ } catch(Exception e) {
+ LOG.error("Unable to initialize serializer for
ArrowColumnarBatchSerDe");
+ throw new RuntimeException(e);
+ }
+ }
return serializer.serialize(obj, objInspector);
}
@@ -267,6 +273,14 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe
{
@Override
public Object deserialize(Writable writable) {
+ if(deserializer == null) {
+ try {
+ rootAllocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf);
+ deserializer = new Deserializer(this);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
return deserializer.deserialize(writable);
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
index dd490b1b909..53bee6b823f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.hive.ql.io.arrow;
+import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.arrow.vector.complex.NonNullableStructVector;
import java.io.DataInput;
import java.io.DataOutput;
@@ -27,10 +29,19 @@ import java.io.IOException;
public class ArrowWrapperWritable implements WritableComparable {
private VectorSchemaRoot vectorSchemaRoot;
+ private BufferAllocator allocator;
+ private NonNullableStructVector rootVector;
public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) {
this.vectorSchemaRoot = vectorSchemaRoot;
}
+
+ public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot,
BufferAllocator allocator, NonNullableStructVector rootVector) {
+ this.vectorSchemaRoot = vectorSchemaRoot;
+ this.allocator = allocator;
+ this.rootVector = rootVector;
+ }
+
public ArrowWrapperWritable() {}
public VectorSchemaRoot getVectorSchemaRoot() {
@@ -41,6 +52,14 @@ public class ArrowWrapperWritable implements
WritableComparable {
this.vectorSchemaRoot = vectorSchemaRoot;
}
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
+
+ public NonNullableStructVector getRootVector() {
+ return rootVector;
+ }
+
@Override
public void write(DataOutput dataOutput) throws IOException {
throw new UnsupportedOperationException();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
index 7a432ac836b..5289b7c1efd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
@@ -38,30 +38,37 @@ import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.NonNullableStructVector;
import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.holders.DecimalHolder;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -69,11 +76,15 @@ import
org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.arrow.memory.BufferAllocator;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_ALLOCATOR_LIMIT;
import static
org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector;
import static
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_MILLIS;
import static
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND;
@@ -85,31 +96,61 @@ import static
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStruc
import static
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE;
import static
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromObjectInspector;
-class Serializer {
+public class Serializer {
private final int MAX_BUFFERED_ROWS;
-
- // Schema
- private final StructTypeInfo structTypeInfo;
- private final int fieldSize;
+ private final static byte[] EMPTY_BYTES = new byte[0];
// Hive columns
private final VectorizedRowBatch vectorizedRowBatch;
private final VectorAssignRow vectorAssignRow;
private int batchSize;
+ private BufferAllocator allocator;
+ private List<TypeInfo> fieldTypeInfos;
+ private List<String> fieldNames;
+ private int fieldSize;
private final StructVector rootVector;
+ private final DecimalHolder decimalHolder = new DecimalHolder();
+
+ //Constructor for non-serde serialization
+ public Serializer(Configuration conf, String attemptId, List<TypeInfo>
typeInfos, List<String> fieldNames) {
+ this.fieldTypeInfos = typeInfos;
+ this.fieldNames = fieldNames;
+ long childAllocatorLimit = HiveConf.getLongVar(conf,
HIVE_ARROW_BATCH_ALLOCATOR_LIMIT);
+ //Use per-task allocator for accounting only, no need to reserve per-task
memory
+ long childAllocatorReservation = 0L;
+ //Break out accounting of direct memory per-task, so we can check no
memory is leaked when task is completed
+ allocator =
RootAllocatorFactory.INSTANCE.getRootAllocator(conf).newChildAllocator(
+ attemptId,
+ childAllocatorReservation,
+ childAllocatorLimit);
+ rootVector = StructVector.empty(null, allocator);
+ //These last fields are unused in non-serde usage
+ vectorizedRowBatch = null;
+ vectorAssignRow = null;
+ MAX_BUFFERED_ROWS = 0;
+ }
Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException {
MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE);
+ long childAllocatorLimit = HiveConf.getLongVar(serDe.conf,
HIVE_ARROW_BATCH_ALLOCATOR_LIMIT);
ArrowColumnarBatchSerDe.LOG.info("ArrowColumnarBatchSerDe max number of
buffered columns: " + MAX_BUFFERED_ROWS);
+ String childAllocatorName = Thread.currentThread().getName();
+ //Use per-task allocator for accounting only, no need to reserve per-task
memory
+ long childAllocatorReservation = 0L;
+ //Break out accounting of direct memory per-task, so we can check no
memory is leaked when task is completed
+ allocator = serDe.rootAllocator.newChildAllocator(
+ childAllocatorName,
+ childAllocatorReservation,
+ childAllocatorLimit);
// Schema
- structTypeInfo = (StructTypeInfo)
getTypeInfoFromObjectInspector(serDe.rowObjectInspector);
- List<TypeInfo> fieldTypeInfos =
structTypeInfo.getAllStructFieldTypeInfos();
+ StructTypeInfo structTypeInfo = (StructTypeInfo)
getTypeInfoFromObjectInspector(serDe.rowObjectInspector);
+ fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ fieldNames = structTypeInfo.getAllStructFieldNames();
fieldSize = fieldTypeInfos.size();
-
// Init Arrow stuffs
- rootVector = StructVector.empty(null, serDe.rootAllocator);
+ rootVector = StructVector.empty(null, allocator);
// Init Hive stuffs
vectorizedRowBatch = new VectorizedRowBatch(fieldSize);
@@ -127,33 +168,66 @@ class Serializer {
}
}
- private ArrowWrapperWritable serializeBatch() {
+ //Construct an emptyBatch which contains schema-only info
+ public ArrowWrapperWritable emptyBatch() {
+ rootVector.setValueCount(0);
+ for (int fieldIndex = 0; fieldIndex < fieldTypeInfos.size(); fieldIndex++)
{
+ final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+ final String fieldName = fieldNames.get(fieldIndex);
+ final FieldType fieldType = toFieldType(fieldTypeInfo);
+ final FieldVector arrowVector = rootVector.addOrGet(fieldName,
fieldType, FieldVector.class);
+ arrowVector.setInitialCapacity(0);
+ arrowVector.allocateNew();
+ }
+ VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector);
+ return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector);
+ }
+
+ //Used for both:
+ //1. VectorizedRowBatch constructed by batching rows
+ //2. VectorizedRowBatch provided from upstream (isNative)
+ public ArrowWrapperWritable serializeBatch(VectorizedRowBatch
vectorizedRowBatch, boolean isNative) {
rootVector.setValueCount(0);
for (int fieldIndex = 0; fieldIndex < vectorizedRowBatch.projectionSize;
fieldIndex++) {
final int projectedColumn =
vectorizedRowBatch.projectedColumns[fieldIndex];
final ColumnVector hiveVector = vectorizedRowBatch.cols[projectedColumn];
- final TypeInfo fieldTypeInfo =
structTypeInfo.getAllStructFieldTypeInfos().get(fieldIndex);
- final String fieldName =
structTypeInfo.getAllStructFieldNames().get(fieldIndex);
+ final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+ final String fieldName = fieldNames.get(fieldIndex);
final FieldType fieldType = toFieldType(fieldTypeInfo);
+ //Reuse existing FieldVector buffers
+ //since we always call setValue or setNull for each row
+ boolean fieldExists = false;
+ if(rootVector.getChild(fieldName) != null) {
+ fieldExists = true;
+ }
final FieldVector arrowVector = rootVector.addOrGet(fieldName,
fieldType, FieldVector.class);
- arrowVector.setInitialCapacity(batchSize);
- arrowVector.allocateNew();
- write(arrowVector, hiveVector, fieldTypeInfo, batchSize);
+ if(fieldExists) {
+ arrowVector.setValueCount(isNative ? vectorizedRowBatch.size :
batchSize);
+ } else {
+ arrowVector.setInitialCapacity(isNative ? vectorizedRowBatch.size :
batchSize);
+ arrowVector.allocateNew();
+ }
+ write(arrowVector, hiveVector, fieldTypeInfo, isNative ?
vectorizedRowBatch.size : batchSize, vectorizedRowBatch, isNative);
+ }
+ if(!isNative) {
+ //Only mutate batches that are constructed by this serde
+ vectorizedRowBatch.reset();
+ rootVector.setValueCount(batchSize);
+ } else {
+ rootVector.setValueCount(vectorizedRowBatch.size);
}
- vectorizedRowBatch.reset();
- rootVector.setValueCount(batchSize);
batchSize = 0;
VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector);
- return new ArrowWrapperWritable(vectorSchemaRoot);
+ return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector);
}
- private FieldType toFieldType(TypeInfo typeInfo) {
+ private static FieldType toFieldType(TypeInfo typeInfo) {
return new FieldType(true, toArrowType(typeInfo), null);
}
- private ArrowType toArrowType(TypeInfo typeInfo) {
+ private static ArrowType toArrowType(TypeInfo typeInfo) {
switch (typeInfo.getCategory()) {
case PRIMITIVE:
switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
@@ -207,38 +281,43 @@ class Serializer {
}
}
- private void write(FieldVector arrowVector, ColumnVector hiveVector,
TypeInfo typeInfo, int size) {
+ private void write(FieldVector arrowVector, ColumnVector hiveVector,
TypeInfo typeInfo, int size,
+ VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
switch (typeInfo.getCategory()) {
case PRIMITIVE:
- writePrimitive(arrowVector, hiveVector, typeInfo, size);
+ writePrimitive(arrowVector, hiveVector, typeInfo, size,
vectorizedRowBatch, isNative);
break;
case LIST:
- writeList((ListVector) arrowVector, (ListColumnVector) hiveVector,
(ListTypeInfo) typeInfo, size);
+ writeList((ListVector) arrowVector, (ListColumnVector) hiveVector,
(ListTypeInfo) typeInfo, size, vectorizedRowBatch, isNative);
break;
case STRUCT:
- writeStruct((NonNullableStructVector) arrowVector,
(StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size);
+ writeStruct((NonNullableStructVector) arrowVector,
(StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size,
vectorizedRowBatch, isNative);
break;
case UNION:
- writeUnion(arrowVector, hiveVector, typeInfo, size);
+ writeUnion(arrowVector, hiveVector, typeInfo, size,
vectorizedRowBatch, isNative);
break;
case MAP:
- writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector,
(MapTypeInfo) typeInfo, size);
+ writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector,
(MapTypeInfo) typeInfo, size, vectorizedRowBatch, isNative);
break;
default:
throw new IllegalArgumentException();
- }
+ }
}
private void writeMap(ListVector arrowVector, MapColumnVector hiveVector,
MapTypeInfo typeInfo,
- int size) {
+ int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
final ListTypeInfo structListTypeInfo = toStructListTypeInfo(typeInfo);
final ListColumnVector structListVector = toStructListVector(hiveVector);
- write(arrowVector, structListVector, structListTypeInfo, size);
+ write(arrowVector, structListVector, structListTypeInfo, size,
vectorizedRowBatch, isNative);
final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
for (int rowIndex = 0; rowIndex < size; rowIndex++) {
- if (hiveVector.isNull[rowIndex]) {
+ int selectedIndex = rowIndex;
+ if (vectorizedRowBatch.selectedInUse) {
+ selectedIndex = vectorizedRowBatch.selected[rowIndex];
+ }
+ if (hiveVector.isNull[selectedIndex]) {
BitVectorHelper.setValidityBit(validityBuffer, rowIndex, 0);
} else {
BitVectorHelper.setValidityBitToOne(validityBuffer, rowIndex);
@@ -247,7 +326,7 @@ class Serializer {
}
private void writeUnion(FieldVector arrowVector, ColumnVector hiveVector,
TypeInfo typeInfo,
- int size) {
+ int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
final List<TypeInfo> objectTypeInfos =
unionTypeInfo.getAllUnionObjectTypeInfos();
final UnionColumnVector hiveUnionVector = (UnionColumnVector) hiveVector;
@@ -257,11 +336,11 @@ class Serializer {
final ColumnVector hiveObjectVector = hiveObjectVectors[tag];
final TypeInfo objectTypeInfo = objectTypeInfos.get(tag);
- write(arrowVector, hiveObjectVector, objectTypeInfo, size);
+ write(arrowVector, hiveObjectVector, objectTypeInfo, size,
vectorizedRowBatch, isNative);
}
private void writeStruct(NonNullableStructVector arrowVector,
StructColumnVector hiveVector,
- StructTypeInfo typeInfo, int size) {
+ StructTypeInfo typeInfo, int size, VectorizedRowBatch
vectorizedRowBatch, boolean isNative) {
final List<String> fieldNames = typeInfo.getAllStructFieldNames();
final List<TypeInfo> fieldTypeInfos =
typeInfo.getAllStructFieldTypeInfos();
final ColumnVector[] hiveFieldVectors = hiveVector.fields;
@@ -291,7 +370,7 @@ class Serializer {
toFieldType(fieldTypeInfos.get(fieldIndex)), FieldVector.class);
arrowFieldVector.setInitialCapacity(size);
arrowFieldVector.allocateNew();
- write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size);
+ write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size,
vectorizedRowBatch, isNative);
}
final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
@@ -304,235 +383,332 @@ class Serializer {
}
}
- private void writeList(ListVector arrowVector, ListColumnVector hiveVector,
ListTypeInfo typeInfo,
- int size) {
+ // selected[] points to the valid/filtered/selected records at row level.
+ // for MultiValuedColumnVector such as ListColumnVector one record of vector
points to multiple nested records.
+ // In child vectors we get these records in exploded manner i.e. the number
of records in child vectors can have size more
+ // than actual the VectorizedRowBatch, consequently selected[] also needs to
be readjusted.
+ // This method creates a shallow copy of VectorizedRowBatch with corrected
size and selected[]
+
+ private static VectorizedRowBatch correctSelectedAndSize(VectorizedRowBatch
sourceVrb,
+
MultiValuedColumnVector multiValuedColumnVector) {
+
+ VectorizedRowBatch vrb = new VectorizedRowBatch(sourceVrb.numCols,
sourceVrb.size);
+ vrb.cols = sourceVrb.cols;
+ vrb.endOfFile = sourceVrb.endOfFile;
+ vrb.projectedColumns = sourceVrb.projectedColumns;
+ vrb.projectionSize = sourceVrb.projectionSize;
+ vrb.selectedInUse = sourceVrb.selectedInUse;
+ vrb.setPartitionInfo(sourceVrb.getDataColumnCount(),
sourceVrb.getPartitionColumnCount());
+
+ int correctedSize = 0;
+ final int[] srcVrbSelected = sourceVrb.selected;
+ for (int i = 0; i < sourceVrb.size; i++) {
+ correctedSize += multiValuedColumnVector.lengths[srcVrbSelected[i]];
+ }
+
+ int newIndex = 0;
+ final int[] selectedOffsetsCorrected = new int[correctedSize];
+ for (int i = 0; i < sourceVrb.size; i++) {
+ long elementIndex = multiValuedColumnVector.offsets[srcVrbSelected[i]];
+ long elementSize = multiValuedColumnVector.lengths[srcVrbSelected[i]];
+ for (int j = 0; j < elementSize; j++) {
+ selectedOffsetsCorrected[newIndex++] = (int) (elementIndex + j);
+ }
+ }
+ vrb.selected = selectedOffsetsCorrected;
+ vrb.size = correctedSize;
+ return vrb;
+ }
+
+ private void writeList(ListVector arrowVector, ListColumnVector hiveVector,
ListTypeInfo typeInfo, int size,
+ VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
final int OFFSET_WIDTH = 4;
final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo();
final ColumnVector hiveElementVector = hiveVector.child;
final FieldVector arrowElementVector =
- (FieldVector)
arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector();
- arrowElementVector.setInitialCapacity(hiveVector.childCount);
+ (FieldVector)
arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector();
+
+ VectorizedRowBatch correctedVrb = vectorizedRowBatch;
+ int correctedSize = hiveVector.childCount;
+ if (vectorizedRowBatch.selectedInUse) {
+ correctedVrb = correctSelectedAndSize(vectorizedRowBatch, hiveVector);
+ correctedSize = correctedVrb.size;
+ }
+ arrowElementVector.setInitialCapacity(correctedSize);
arrowElementVector.allocateNew();
- write(arrowElementVector, hiveElementVector, elementTypeInfo,
hiveVector.childCount);
+ write(arrowElementVector, hiveElementVector, elementTypeInfo,
correctedSize, correctedVrb, isNative);
final ArrowBuf offsetBuffer = arrowVector.getOffsetBuffer();
int nextOffset = 0;
for (int rowIndex = 0; rowIndex < size; rowIndex++) {
- if (hiveVector.isNull[rowIndex]) {
+ int selectedIndex = rowIndex;
+ if (vectorizedRowBatch.selectedInUse) {
+ selectedIndex = vectorizedRowBatch.selected[rowIndex];
+ }
+ if (hiveVector.isNull[selectedIndex]) {
offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
} else {
offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
- nextOffset += (int) hiveVector.lengths[rowIndex];
+ nextOffset += (int) hiveVector.lengths[selectedIndex];
arrowVector.setNotNull(rowIndex);
}
}
offsetBuffer.setInt(size * OFFSET_WIDTH, nextOffset);
}
- private void writePrimitive(FieldVector arrowVector, ColumnVector
hiveVector, TypeInfo typeInfo,
- int size) {
+ //Handle cases for both internally constructed
+ //and externally provided (isNative) VectorRowBatch
+ private void writePrimitive(FieldVector arrowVector, ColumnVector
hiveVector, TypeInfo typeInfo, int size,
+ VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
switch (primitiveCategory) {
- case BOOLEAN:
- {
- final BitVector bitVector = (BitVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- bitVector.setNull(i);
- } else {
- bitVector.set(i, (int) ((LongColumnVector)
hiveVector).vector[i]);
- }
- }
+ case BOOLEAN:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, boolNullSetter,
boolValueSetter, typeInfo);
+ return;
+ }
+ final BitVector bitVector = (BitVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ boolNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ boolValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case BYTE:
- {
- final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- tinyIntVector.setNull(i);
- } else {
- tinyIntVector.set(i, (byte) ((LongColumnVector)
hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case BYTE:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, byteNullSetter,
byteValueSetter, typeInfo);
+ return;
+ }
+ final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ byteNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ byteValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case SHORT:
- {
- final SmallIntVector smallIntVector = (SmallIntVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- smallIntVector.setNull(i);
- } else {
- smallIntVector.set(i, (short) ((LongColumnVector)
hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case SHORT:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, shortNullSetter,
shortValueSetter, typeInfo);
+ return;
+ }
+ final SmallIntVector smallIntVector = (SmallIntVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ shortNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ shortValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case INT:
- {
- final IntVector intVector = (IntVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- intVector.setNull(i);
- } else {
- intVector.set(i, (int) ((LongColumnVector)
hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case INT:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intNullSetter,
intValueSetter, typeInfo);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ intNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ intValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case LONG:
- {
- final BigIntVector bigIntVector = (BigIntVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- bigIntVector.setNull(i);
- } else {
- bigIntVector.set(i, ((LongColumnVector) hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case LONG:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, longNullSetter,
longValueSetter, typeInfo);
+ return;
+ }
+ final BigIntVector bigIntVector = (BigIntVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ longNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ longValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case FLOAT:
- {
- final Float4Vector float4Vector = (Float4Vector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- float4Vector.setNull(i);
- } else {
- float4Vector.set(i, (float) ((DoubleColumnVector)
hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case FLOAT:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, floatNullSetter,
floatValueSetter, typeInfo);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ floatNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ floatValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case DOUBLE:
- {
- final Float8Vector float8Vector = (Float8Vector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- float8Vector.setNull(i);
- } else {
- float8Vector.set(i, ((DoubleColumnVector) hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case DOUBLE:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected,
doubleNullSetter, doubleValueSetter, typeInfo);
+ return;
+ }
+ final Float8Vector float8Vector = (Float8Vector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ doubleNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ doubleValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case STRING:
- case VARCHAR:
- case CHAR:
- {
- final VarCharVector varCharVector = (VarCharVector) arrowVector;
- final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- varCharVector.setNull(i);
- } else {
- varCharVector.setSafe(i, bytesVector.vector[i],
bytesVector.start[i], bytesVector.length[i]);
- }
- }
+ }
+ }
+ break;
+ case CHAR:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, charNullSetter,
charValueSetter, typeInfo);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ charNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ charValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case DATE:
- {
- final DateDayVector dateDayVector = (DateDayVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- dateDayVector.setNull(i);
- } else {
- dateDayVector.set(i, (int) ((LongColumnVector)
hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case STRING:
+ case VARCHAR:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected,
stringNullSetter, stringValueSetter, typeInfo);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ stringNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ stringValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case TIMESTAMP:
- {
- final TimeStampMicroTZVector timeStampMicroTZVector =
(TimeStampMicroTZVector) arrowVector;
- final TimestampColumnVector timestampColumnVector =
(TimestampColumnVector) hiveVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- timeStampMicroTZVector.setNull(i);
- } else {
- // Time = second + sub-second
- final long secondInMillis = timestampColumnVector.getTime(i);
- final long secondInMicros = (secondInMillis - secondInMillis %
MILLIS_PER_SECOND) * MICROS_PER_MILLIS;
- final long subSecondInMicros = timestampColumnVector.getNanos(i)
/ NS_PER_MICROS;
-
- if ((secondInMillis > 0 && secondInMicros < 0) ||
(secondInMillis < 0 && secondInMicros > 0)) {
- // If the timestamp cannot be represented in long microsecond,
set it as a null value
- timeStampMicroTZVector.setNull(i);
- } else {
- timeStampMicroTZVector.set(i, secondInMicros +
subSecondInMicros);
- }
- }
- }
+ }
+ }
+ break;
+ case DATE:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, dateNullSetter,
dateValueSetter, typeInfo);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ dateNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ dateValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case BINARY:
- {
- final VarBinaryVector varBinaryVector = (VarBinaryVector)
arrowVector;
- final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- varBinaryVector.setNull(i);
- } else {
- varBinaryVector.setSafe(i, bytesVector.vector[i],
bytesVector.start[i], bytesVector.length[i]);
- }
- }
+ }
+ }
+ break;
+ case TIMESTAMP:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected,
timestampNullSetter, timestampValueSetter, typeInfo);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ timestampNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ timestampValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case DECIMAL:
- {
- final DecimalVector decimalVector = (DecimalVector) arrowVector;
- final int scale = decimalVector.getScale();
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- decimalVector.setNull(i);
- } else {
- decimalVector.set(i,
- ((DecimalColumnVector)
hiveVector).vector[i].getHiveDecimal().bigDecimalValue().setScale(scale));
- }
- }
+ }
+ }
+ break;
+ case BINARY:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected,
binaryNullSetter, binaryValueSetter, typeInfo);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ binaryNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ binaryValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
}
- break;
- case INTERVAL_YEAR_MONTH:
- {
- final IntervalYearVector intervalYearVector = (IntervalYearVector)
arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- intervalYearVector.setNull(i);
- } else {
- intervalYearVector.set(i, (int) ((LongColumnVector)
hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case DECIMAL:
+ {
+ if(isNative) {
+ if(hiveVector instanceof DecimalColumnVector) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected,
decimalNullSetter, decimalValueSetter, typeInfo);
+ } else {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected,
decimalNullSetter, decimal64ValueSetter, typeInfo);
}
- break;
- case INTERVAL_DAY_TIME:
- {
- final IntervalDayVector intervalDayVector = (IntervalDayVector)
arrowVector;
- final IntervalDayTimeColumnVector intervalDayTimeColumnVector =
- (IntervalDayTimeColumnVector) hiveVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- intervalDayVector.setNull(i);
- } else {
- final long totalSeconds =
intervalDayTimeColumnVector.getTotalSeconds(i);
- final long days = totalSeconds / SECOND_PER_DAY;
- final long millis =
- (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND +
- intervalDayTimeColumnVector.getNanos(i) / NS_PER_MILLIS;
- intervalDayVector.set(i, (int) days, (int) millis);
- }
- }
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ decimalNullSetter.accept(i, arrowVector, hiveVector);
+ } else if(hiveVector instanceof DecimalColumnVector) {
+ decimalValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
+ } else if(hiveVector instanceof Decimal64ColumnVector) {
+ decimal64ValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
+ } else {
+ throw new IllegalArgumentException("Unsupported vector column type:
" + hiveVector.getClass().getName());
}
- break;
- case VOID:
- case UNKNOWN:
- case TIMESTAMPLOCALTZ:
- default:
- throw new IllegalArgumentException();
+ }
+ }
+ break;
+ case INTERVAL_YEAR_MONTH:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected,
intervalYearMonthNullSetter, intervalYearMonthValueSetter, typeInfo);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ intervalYearMonthNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ intervalYearMonthValueSetter.accept(i, i, arrowVector, hiveVector,
typeInfo);
+ }
+ }
+ }
+ break;
+ case INTERVAL_DAY_TIME:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size,
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected,
intervalDayTimeNullSetter, intervalDayTimeValueSetter, typeInfo);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ intervalDayTimeNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ intervalDayTimeValueSetter.accept(i, i, arrowVector, hiveVector,
typeInfo);
+ }
+ }
+ }
+ break;
+ case VOID:
+ case UNKNOWN:
+ case TIMESTAMPLOCALTZ:
+ default:
+ throw new IllegalArgumentException();
}
}
@@ -540,7 +716,7 @@ class Serializer {
// if row is null, it means there are no more rows (closeOp()).
// another case can be that the buffer is full.
if (obj == null) {
- return serializeBatch();
+ return serializeBatch(vectorizedRowBatch, false);
}
List<Object> standardObjects = new ArrayList<Object>();
ObjectInspectorUtils.copyToStandardObject(standardObjects, obj,
@@ -549,8 +725,241 @@ class Serializer {
vectorAssignRow.assignRow(vectorizedRowBatch, batchSize, standardObjects,
fieldSize);
batchSize++;
if (batchSize == MAX_BUFFERED_ROWS) {
- return serializeBatch();
+ return serializeBatch(vectorizedRowBatch, false);
}
return null;
}
+
+ //Use a provided nullSetter and valueSetter function to populate
+ //fieldVector from hiveVector
+ private static void writeGeneric(final FieldVector fieldVector, final
ColumnVector hiveVector, final int size, final boolean selectedInUse, final
int[] selected, final IntAndVectorsConsumer nullSetter, final
IntIntAndVectorsConsumer valueSetter, TypeInfo typeInfo)
+ {
+ final boolean[] inputIsNull = hiveVector.isNull;
+ final int[] sel = selected;
+
+ if (hiveVector.isRepeating) {
+ if (hiveVector.noNulls || !inputIsNull[0]) {
+ for(int i = 0; i < size; i++) {
+ //Fill n rows with value in row 0
+ valueSetter.accept(i, 0, fieldVector, hiveVector, typeInfo);
+ }
+ } else {
+ for(int i = 0; i < size; i++) {
+ //Fill n rows with NULL
+ nullSetter.accept(i, fieldVector, hiveVector);
+ }
+ }
+ return;
+ }
+
+ if (hiveVector.noNulls) {
+ if (selectedInUse) {
+ for(int logical = 0; logical < size; logical++) {
+ final int batchIndex = sel[logical];
+ //Add row batchIndex
+ valueSetter.accept(logical, batchIndex, fieldVector, hiveVector,
typeInfo);
+ }
+ } else {
+ for(int batchIndex = 0; batchIndex < size; batchIndex++) {
+ //Add row batchIndex
+ valueSetter.accept(batchIndex, batchIndex, fieldVector, hiveVector,
typeInfo);
+ }
+ }
+ } else {
+ if (selectedInUse) {
+ for(int logical = 0; logical < size; logical++) {
+ final int batchIndex = sel[logical];
+ if (inputIsNull[batchIndex]) {
+ //Add NULL
+ nullSetter.accept(batchIndex, fieldVector, hiveVector);
+ } else {
+ //Add row batchIndex
+ valueSetter.accept(logical, batchIndex, fieldVector, hiveVector,
typeInfo);
+ }
+ }
+ } else {
+ for(int batchIndex = 0; batchIndex < size; batchIndex++) {
+ if (inputIsNull[batchIndex]) {
+ //Add NULL
+ nullSetter.accept(batchIndex, fieldVector, hiveVector);
+ } else {
+ //Add row batchIndex
+ valueSetter.accept(batchIndex, batchIndex, fieldVector,
hiveVector, typeInfo);
+ }
+ }
+ }
+ }
+ }
+
+ //nullSetters and valueSetter for each type
+
+ //bool
+ private static final IntAndVectorsConsumer boolNullSetter = (i, arrowVector,
hiveVector)
+ -> ((BitVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer boolValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> ((BitVector) arrowVector).set(i, (int) ((LongColumnVector)
hiveVector).vector[j]);
+
+ //byte
+ private static final IntAndVectorsConsumer byteNullSetter = (i, arrowVector,
hiveVector)
+ -> ((TinyIntVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer byteValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> ((TinyIntVector) arrowVector).set(i, (byte) ((LongColumnVector)
hiveVector).vector[j]);
+
+ //short
+ private static final IntAndVectorsConsumer shortNullSetter = (i,
arrowVector, hiveVector)
+ -> ((SmallIntVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer shortValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> ((SmallIntVector) arrowVector).set(i, (short) ((LongColumnVector)
hiveVector).vector[j]);
+
+ //int
+ private static final IntAndVectorsConsumer intNullSetter = (i, arrowVector,
hiveVector)
+ -> ((IntVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer intValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> ((IntVector) arrowVector).set(i, (int) ((LongColumnVector)
hiveVector).vector[j]);
+
+ //long
+ private static final IntAndVectorsConsumer longNullSetter = (i, arrowVector,
hiveVector)
+ -> ((BigIntVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer longValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> ((BigIntVector) arrowVector).set(i, ((LongColumnVector)
hiveVector).vector[j]);
+
+ //float
+ private static final IntAndVectorsConsumer floatNullSetter = (i,
arrowVector, hiveVector)
+ -> ((Float4Vector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer floatValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> ((Float4Vector) arrowVector).set(i, (float) ((DoubleColumnVector)
hiveVector).vector[j]);
+
+ //double
+ private static final IntAndVectorsConsumer doubleNullSetter = (i,
arrowVector, hiveVector)
+ -> ((Float8Vector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer doubleValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> ((Float8Vector) arrowVector).set(i, ((DoubleColumnVector)
hiveVector).vector[j]);
+
+ //string/varchar
+ private static final IntAndVectorsConsumer stringNullSetter = (i,
arrowVector, hiveVector)
+ -> ((VarCharVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer stringValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> {
+ BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+ ((VarCharVector) arrowVector).setSafe(i, bytesVector.vector[j],
bytesVector.start[j], bytesVector.length[j]);
+ };
+
+ //fixed-length CHAR
+ private static final IntAndVectorsConsumer charNullSetter = (i, arrowVector,
hiveVector)
+ -> ((VarCharVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer charValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> {
+ BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+ VarCharVector varCharVector = (VarCharVector) arrowVector;
+ byte[] bytes = bytesVector.vector[j];
+ int length = bytesVector.length[j];
+ int start = bytesVector.start[j];
+
+ if (bytes == null) {
+ bytes = EMPTY_BYTES;
+ start = 0;
+ length = 0;
+ }
+
+ final CharTypeInfo charTypeInfo = (CharTypeInfo) typeInfo;
+ final int paddedLength = charTypeInfo.getLength();
+ final byte[] paddedBytes = StringExpr.padRight(bytes, start, length,
paddedLength);
+ varCharVector.setSafe(i, paddedBytes, 0, paddedBytes.length);
+ };
+
+ //date
+ private static final IntAndVectorsConsumer dateNullSetter = (i, arrowVector,
hiveVector)
+ -> ((DateDayVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer dateValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> ((DateDayVector) arrowVector).set(i, (int) ((LongColumnVector)
hiveVector).vector[j]);
+
+ //timestamp
+ private static final IntAndVectorsConsumer timestampNullSetter = (i,
arrowVector, hiveVector)
+ -> ((TimeStampMicroTZVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer timestampValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> {
+ final TimeStampMicroTZVector timeStampMicroTZVector =
(TimeStampMicroTZVector) arrowVector;
+ final TimestampColumnVector timestampColumnVector =
(TimestampColumnVector) hiveVector;
+ // Time = second + sub-second
+ final long secondInMillis = timestampColumnVector.getTime(j);
+ final long secondInMicros = (secondInMillis - secondInMillis %
MILLIS_PER_SECOND) * MICROS_PER_MILLIS;
+ final long subSecondInMicros = timestampColumnVector.getNanos(j) /
NS_PER_MICROS;
+ if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 &&
secondInMicros > 0)) {
+ // If the timestamp cannot be represented in long microsecond, set it as
a null value
+ timeStampMicroTZVector.setNull(i);
+ } else {
+ timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros);
+ }
+ };
+
+ //binary
+ private static final IntAndVectorsConsumer binaryNullSetter = (i,
arrowVector, hiveVector)
+ -> ((VarBinaryVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer binaryValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> {
+ BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+ ((VarBinaryVector) arrowVector).setSafe(i, bytesVector.vector[j],
bytesVector.start[j], bytesVector.length[j]);
+ };
+
+ //decimal and decimal64
+ private static final IntAndVectorsConsumer decimalNullSetter = (i,
arrowVector, hiveVector)
+ -> ((DecimalVector) arrowVector).setNull(i);
+ private final IntIntAndVectorsConsumer decimalValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> {
+ final DecimalVector decimalVector = (DecimalVector) arrowVector;
+ final int scale = decimalVector.getScale();
+ decimalVector.set(i, ((DecimalColumnVector)
hiveVector).vector[j].getHiveDecimal().bigDecimalValue().setScale(scale));
+
+ final HiveDecimalWritable writable = ((DecimalColumnVector)
hiveVector).vector[i];
+ decimalHolder.precision = writable.precision();
+ decimalHolder.scale = scale;
+ try (ArrowBuf arrowBuf = allocator.buffer(DecimalHolder.WIDTH)) {
+ decimalHolder.buffer = arrowBuf;
+ final BigInteger bigInteger = new
BigInteger(writable.getInternalStorage()).
+ multiply(BigInteger.TEN.pow(scale - writable.scale()));
+ decimalVector.set(i, new BigDecimal(bigInteger, scale));
+ }
+ };
+ private static final IntIntAndVectorsConsumer decimal64ValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> {
+ final DecimalVector decimalVector = (DecimalVector) arrowVector;
+ final int scale = decimalVector.getScale();
+ HiveDecimalWritable decimalHolder = new HiveDecimalWritable();
+ decimalHolder.setFromLongAndScale(((Decimal64ColumnVector)
hiveVector).vector[j], scale);
+ decimalVector.set(i,
decimalHolder.getHiveDecimal().bigDecimalValue().setScale(scale));
+ };
+
+ //interval year
+ private static final IntAndVectorsConsumer intervalYearMonthNullSetter = (i,
arrowVector, hiveVector)
+ -> ((IntervalYearVector) arrowVector).setNull(i);
+ private static IntIntAndVectorsConsumer intervalYearMonthValueSetter = (i,
j, arrowVector, hiveVector, typeInfo)
+ -> ((IntervalYearVector) arrowVector).set(i, (int) ((LongColumnVector)
hiveVector).vector[j]);
+
+ //interval day
+ private static final IntAndVectorsConsumer intervalDayTimeNullSetter = (i,
arrowVector, hiveVector)
+ -> ((IntervalDayVector) arrowVector).setNull(i);
+ private static IntIntAndVectorsConsumer intervalDayTimeValueSetter = (i, j,
arrowVector, hiveVector, typeInfo)
+ -> {
+ final IntervalDayVector intervalDayVector = (IntervalDayVector)
arrowVector;
+ final IntervalDayTimeColumnVector intervalDayTimeColumnVector =
+ (IntervalDayTimeColumnVector) hiveVector;
+ long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(j);
+ final long days = totalSeconds / SECOND_PER_DAY;
+ final long millis =
+ (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND +
+ intervalDayTimeColumnVector.getNanos(j) / NS_PER_MILLIS;
+ intervalDayVector.set(i, (int) days, (int) millis);
+ };
+
+ //Used for setting null at arrowVector[i]
+ private interface IntAndVectorsConsumer {
+ void accept(int i, FieldVector arrowVector, ColumnVector hiveVector);
+ }
+
+ //Used to copy value from hiveVector[j] -> arrowVector[i]
+ //since hiveVector might be referenced through vector.selected
+ private interface IntIntAndVectorsConsumer {
+ void accept(int i, int j, FieldVector arrowVector, ColumnVector
hiveVector, TypeInfo typeInfo);
+ }
+
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 1946cecc084..2dd12ef1918 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import
org.apache.hadoop.hive.ql.exec.vector.filesink.VectorFileSinkArrowOperator;
import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyLongOperator;
import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyMultiKeyOperator;
import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyStringOperator;
@@ -4120,6 +4121,48 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
+ private boolean checkForArrowFileSink(FileSinkDesc fileSinkDesc,
+ boolean isTezOrSpark, VectorizationContext vContext,
+ VectorFileSinkDesc vectorDesc) throws HiveException {
+
+ // Various restrictions.
+
+ boolean isVectorizationFileSinkArrowNativeEnabled =
+ HiveConf.getBoolVar(hiveConf,
+
HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED);
+
+ String engine = HiveConf.getVar(hiveConf,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+
+ String serdeClassName = fileSinkDesc.getTableInfo().getSerdeClassName();
+
+ boolean isOkArrowFileSink =
+
serdeClassName.equals("org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe")
&&
+ isVectorizationFileSinkArrowNativeEnabled &&
+ engine.equalsIgnoreCase("tez");
+
+ return isOkArrowFileSink;
+ }
+
+ private Operator<? extends OperatorDesc> specializeArrowFileSinkOperator(
+ Operator<? extends OperatorDesc> op, VectorizationContext vContext,
FileSinkDesc desc,
+ VectorFileSinkDesc vectorDesc) throws HiveException {
+
+ Class<? extends Operator<?>> opClass = VectorFileSinkArrowOperator.class;
+
+ Operator<? extends OperatorDesc> vectorOp = null;
+ try {
+ vectorOp = OperatorFactory.getVectorOperator(
+ opClass, op.getCompilationOpContext(), op.getConf(),
+ vContext, vectorDesc);
+ } catch (Exception e) {
+ LOG.info("Vectorizer vectorizeOperator file sink class exception " +
opClass.getSimpleName() +
+ " exception " + e);
+ throw new HiveException(e);
+ }
+
+ return vectorOp;
+ }
+
private boolean usesVectorUDFAdaptor(VectorExpression vecExpr) {
if (vecExpr == null) {
return false;
@@ -5130,9 +5173,20 @@ public class Vectorizer implements PhysicalPlanResolver {
FileSinkDesc fileSinkDesc = (FileSinkDesc) op.getConf();
VectorFileSinkDesc vectorFileSinkDesc = new VectorFileSinkDesc();
- vectorOp = OperatorFactory.getVectorOperator(
- op.getCompilationOpContext(), fileSinkDesc, vContext,
vectorFileSinkDesc);
- isNative = false;
+ boolean isArrowSpecialization =
+ checkForArrowFileSink(fileSinkDesc, isTezOrSpark, vContext,
vectorFileSinkDesc);
+
+ if (isArrowSpecialization) {
+ vectorOp =
+ specializeArrowFileSinkOperator(
+ op, vContext, fileSinkDesc, vectorFileSinkDesc);
+ isNative = true;
+ } else {
+ vectorOp =
+ OperatorFactory.getVectorOperator(
+ op.getCompilationOpContext(), fileSinkDesc, vContext,
vectorFileSinkDesc);
+ isNative = false;
+ }
}
break;
case LIMIT:
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
index 10484ac9252..bf20aae5195 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.arrow;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.arrow.vector.VarCharVector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -102,6 +103,7 @@ public class TestArrowColumnarBatchSerDe {
{text(""), charW("", 10), varcharW("", 10)},
{text("Hello"), charW("Hello", 10), varcharW("Hello", 10)},
{text("world!"), charW("world!", 10), varcharW("world!", 10)},
+ {text("안녕?"), charW("안녕?", 10), varcharW("안녕?", 10)},
{null, null, null},
};
@@ -525,6 +527,31 @@ public class TestArrowColumnarBatchSerDe {
initAndSerializeAndDeserialize(schema, DECIMAL_ROWS);
}
+ @Test
+ public void testRandomPrimitiveDecimal() throws SerDeException {
+ String[][] schema = {
+ {"decimal1", "decimal(38,10)"},
+ };
+
+ int size = 1000;
+ Object[][] randomDecimals = new Object[size][];
+ Random random = new Random();
+ for (int i = 0; i < size; i++) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(random.nextBoolean() ? '+' : '-');
+ for (int j = 0; j < 28 ; j++) {
+ builder.append(random.nextInt(10));
+ }
+ builder.append('.');
+ for (int j = 0; j < 10; j++) {
+ builder.append(random.nextInt(10));
+ }
+ randomDecimals[i] = new Object[]
{decimalW(HiveDecimal.create(builder.toString()))};
+ }
+
+ initAndSerializeAndDeserialize(schema, randomDecimals);
+ }
+
@Test
public void testPrimitiveBoolean() throws SerDeException {
String[][] schema = {
@@ -768,6 +795,32 @@ public class TestArrowColumnarBatchSerDe {
initAndSerializeAndDeserialize(schema, toMap(BINARY_ROWS));
}
+ @Test
+ public void testPrimitiveCharPadding() throws SerDeException {
+ String[][] schema = {
+ {"char1", "char(10)"},
+ };
+
+ HiveCharWritable[][] rows = new HiveCharWritable[][] {
+ {charW("Hello", 10)}, {charW("world!", 10)}};
+ ArrowColumnarBatchSerDe serDe = new ArrowColumnarBatchSerDe();
+ StructObjectInspector rowOI = initSerDe(serDe, schema);
+
+ ArrowWrapperWritable serialized = null;
+ for (Object[] row : rows) {
+ serialized = serDe.serialize(row, rowOI);
+ }
+ // Pass null to complete a batch
+ if (serialized == null) {
+ serialized = serDe.serialize(null, rowOI);
+ }
+
+ VarCharVector varCharVector = (VarCharVector)
serialized.getVectorSchemaRoot().getFieldVectors().get(0);
+ for (int i = 0; i < rows.length; i++) {
+ assertEquals(rows[i][0].getPaddedValue().toString(), new
String(varCharVector.get(i)));
+ }
+ }
+
public void testMapDecimal() throws SerDeException {
String[][] schema = {
{"decimal_map", "map<string,decimal(38,10)>"},
diff --git
a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java
b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java
index 162e8e6353a..6ae2969171e 100644
---
a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java
+++
b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java
@@ -129,6 +129,21 @@ public class StringExpr {
return charCount;
}
+ public static byte[] padRight(byte[] bytes, int start, int length, int
maxCharacterLength) {
+ final byte[] resultBytes;
+ final int characterLength = StringExpr.characterCount(bytes, start,
length);
+ final int blankPadLength = Math.max(maxCharacterLength - characterLength,
0);
+ final int resultLength = length + blankPadLength;
+ resultBytes = new byte[resultLength];
+ final int resultStart = 0;
+ System.arraycopy(bytes, start, resultBytes, resultStart, length);
+ final int padEnd = resultStart + resultLength;
+ for (int p = resultStart + length; p < padEnd; p++) {
+ resultBytes[p] = ' ';
+ }
+ return resultBytes;
+ }
+
// A setVal with the same function signature as rightTrim, leftTrim,
truncate, etc, below.
// Useful for class generation via templates.
public static void assign(BytesColumnVector outV, int i, byte[] bytes, int
start, int length) {