This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new d6668c7 HIVE-22221: Llap external client - Need to reduce
LlapBaseInputFormat#getSplits() footprint (Shubham Chaurasia, reviewed by Jason
Dere)
d6668c7 is described below
commit d6668c705da3b4afb4ae3e113e21ae5f0c53b8ae
Author: Shubham Chaurasia <[email protected]>
AuthorDate: Wed Sep 25 13:32:00 2019 +0530
HIVE-22221: Llap external client - Need to reduce
LlapBaseInputFormat#getSplits() footprint (Shubham Chaurasia, reviewed by Jason
Dere)
Signed-off-by: Sankar Hariappan <[email protected]>
---
...a => AbstractTestJdbcGenericUDTFGetSplits.java} | 167 ++++++++------------
.../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 8 +-
.../hive/jdbc/TestJdbcGenericUDTFGetSplits.java | 168 ++-------------------
.../hive/jdbc/TestJdbcGenericUDTFGetSplits2.java | 31 ++++
.../apache/hive/jdbc/TestNewGetSplitsFormat.java | 116 ++++++++++++++
.../apache/hadoop/hive/llap/LlapInputSplit.java | 8 +
.../hadoop/hive/llap/LlapBaseInputFormat.java | 40 ++++-
.../hadoop/hive/ql/exec/FunctionRegistry.java | 1 +
.../hive/ql/udf/generic/GenericUDTFGetSplits.java | 168 ++++++++++++++-------
.../hive/ql/udf/generic/GenericUDTFGetSplits2.java | 109 +++++++++++++
.../results/clientpositive/show_functions.q.out | 2 +
11 files changed, 490 insertions(+), 328 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java
similarity index 55%
copy from
itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
copy to
itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java
index f6f64b8..02dfe7c 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java
@@ -16,10 +16,14 @@
package org.apache.hive.jdbc;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -34,34 +38,20 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.FieldDesc;
-import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
-import org.apache.hadoop.hive.llap.LlapInputSplit;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hive.jdbc.miniHS2.MiniHS2;
-import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
-public class TestJdbcGenericUDTFGetSplits {
+/**
+ * AbstractTestJdbcGenericUDTFGetSplits.
+ */
+public abstract class AbstractTestJdbcGenericUDTFGetSplits {
protected static MiniHS2 miniHS2 = null;
protected static String dataFileDir;
- static Path kvDataFilePath;
protected static String tableName = "testtab1";
-
protected static HiveConf conf = null;
+ static Path kvDataFilePath;
protected Connection hs2Conn = null;
@BeforeClass
@@ -74,21 +64,21 @@ public class TestJdbcGenericUDTFGetSplits {
conf = new HiveConf();
conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
- conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
- conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default");
- conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100,
TimeUnit.MILLISECONDS);
- conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS,
true);
- conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true);
- conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
- conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none");
- conf.setVar(ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT,
"text");
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default");
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+
conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS,
true);
+ conf.setBoolVar(HiveConf.ConfVars.TEZ_EXEC_SUMMARY, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
+ conf.setVar(HiveConf.ConfVars.LLAP_IO_MEMORY_MODE, "none");
+
conf.setVar(HiveConf.ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT,
"text");
conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
- + "/tez-site.xml"));
+ + "/tez-site.xml"));
- miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
+ miniHS2 = new MiniHS2(conf, MiniHS2.MiniClusterType.LLAP);
dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:",
"");
kvDataFilePath = new Path(dataFileDir, "kv1.txt");
@@ -97,17 +87,6 @@ public class TestJdbcGenericUDTFGetSplits {
miniHS2.getDFS().getFileSystem().mkdirs(new
Path("/apps_staging_dir/anonymous"));
}
- @Before
- public void setUp() throws Exception {
- hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
System.getProperty("user.name"), "bar");
- }
-
- @After
- public void tearDown() throws Exception {
- LlapBaseInputFormat.closeAll();
- hs2Conn.close();
- }
-
@AfterClass
public static void afterTest() throws Exception {
if (miniHS2.isStarted()) {
@@ -115,68 +94,19 @@ public class TestJdbcGenericUDTFGetSplits {
}
}
- @Test(timeout = 200000)
- public void testGenericUDTFOrderBySplitCount1() throws Exception {
- String query = "select get_splits(" + "'select value from " + tableName +
"', 5)";
- runQuery(query, getConfigs(), 10);
-
- query = "select get_splits(" + "'select value from " + tableName + " order
by under_col', 5)";
- runQuery(query, getConfigs(), 1);
-
- query = "select get_splits(" + "'select value from " + tableName + " order
by under_col limit 0', 5)";
- runQuery(query, getConfigs(), 0);
-
- query = "select get_splits(" +
- "'select `value` from (select value from " + tableName + " where value
is not null order by value) as t', 5)";
- runQuery(query, getConfigs(), 1);
-
- List<String> setCmds = getConfigs();
- setCmds.add("set
hive.llap.external.splits.order.by.force.single.split=false");
- query = "select get_splits(" +
- "'select `value` from (select value from " + tableName + " where value
is not null order by value) as t', 5)";
- runQuery(query, setCmds, 10);
+ @Before
+ public void setUp() throws Exception {
+ hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
System.getProperty("user.name"), "bar");
}
- @Test
- public void testDecimalPrecisionAndScale() throws Exception {
- try (Statement stmt = hs2Conn.createStatement()) {
- stmt.execute("CREATE TABLE decimal_test_table(decimal_col
DECIMAL(6,2))");
- stmt.execute("INSERT INTO decimal_test_table VALUES(2507.92)");
-
- ResultSet rs = stmt.executeQuery("SELECT * FROM decimal_test_table");
- assertTrue(rs.next());
- rs.close();
-
- String url = miniHS2.getJdbcURL();
- String user = System.getProperty("user.name");
- String pwd = user;
- String handleId = UUID.randomUUID().toString();
- String sql = "SELECT avg(decimal_col)/3 FROM decimal_test_table";
-
- // make request through llap-ext-client
- JobConf job = new JobConf(conf);
- job.set(LlapBaseInputFormat.URL_KEY, url);
- job.set(LlapBaseInputFormat.USER_KEY, user);
- job.set(LlapBaseInputFormat.PWD_KEY, pwd);
- job.set(LlapBaseInputFormat.QUERY_KEY, sql);
- job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
-
- LlapBaseInputFormat llapBaseInputFormat = new LlapBaseInputFormat();
- //schema split
- LlapInputSplit schemaSplit = (LlapInputSplit)
llapBaseInputFormat.getSplits(job, 0)[0];
- assertNotNull(schemaSplit);
- FieldDesc fieldDesc = schemaSplit.getSchema().getColumns().get(0);
- DecimalTypeInfo type = (DecimalTypeInfo) fieldDesc.getTypeInfo();
- assertEquals(38, type.getPrecision());
- assertEquals(24, type.scale());
-
- LlapBaseInputFormat.close(handleId);
- }
+ @After
+ public void tearDown() throws Exception {
+ LlapBaseInputFormat.closeAll();
+ hs2Conn.close();
}
-
- private void runQuery(final String query, final List<String> setCmds,
- final int numRows) throws Exception {
+ protected void runQuery(final String query, final List<String> setCmds,
+ final int numRows) throws Exception {
Connection con = hs2Conn;
BaseJdbcWithMiniLlap.createTestTable(con, null, tableName,
kvDataFilePath.toString());
@@ -194,7 +124,7 @@ public class TestJdbcGenericUDTFGetSplits {
}
}
ResultSet resultSet = selStmt.executeQuery(query);
- while(resultSet.next()) {
+ while (resultSet.next()) {
rowCount++;
}
} catch (SQLException e) {
@@ -210,7 +140,7 @@ public class TestJdbcGenericUDTFGetSplits {
}
- List<String> getConfigs(String... more) {
+ protected List<String> getConfigs(String... more) {
List<String> setCmds = new ArrayList<>();
setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict");
setCmds.add("set mapred.min.split.size=10");
@@ -224,4 +154,25 @@ public class TestJdbcGenericUDTFGetSplits {
}
return setCmds;
}
-}
\ No newline at end of file
+
+ protected void testGenericUDTFOrderBySplitCount1(String udtfName, int[]
expectedCounts) throws Exception {
+ String query = "select " + udtfName + "(" + "'select value from " +
tableName + "', 5)";
+ runQuery(query, getConfigs(), expectedCounts[0]);
+
+ query = "select " + udtfName + "(" + "'select value from " + tableName + "
order by under_col', 5)";
+ runQuery(query, getConfigs(), expectedCounts[1]);
+
+ query = "select " + udtfName + "(" + "'select value from " + tableName + "
order by under_col limit 0', 5)";
+ runQuery(query, getConfigs(), expectedCounts[2]);
+
+ query = "select " + udtfName + "(" +
+ "'select `value` from (select value from " + tableName + " where value
is not null order by value) as t', 5)";
+ runQuery(query, getConfigs(), expectedCounts[3]);
+
+ List<String> setCmds = getConfigs();
+ setCmds.add("set
hive.llap.external.splits.order.by.force.single.split=false");
+ query = "select " + udtfName + "(" +
+ "'select `value` from (select value from " + tableName + " where value
is not null order by value) as t', 5)";
+ runQuery(query, setCmds, expectedCounts[4]);
+ }
+}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 8467cea..f4e9f9a 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -101,11 +101,12 @@ import org.apache.hadoop.mapred.InputFormat;
* by sub-classes in a {@link org.junit.BeforeClass} initializer
*/
public abstract class BaseJdbcWithMiniLlap {
- private static MiniHS2 miniHS2 = null;
+
private static String dataFileDir;
private static Path kvDataFilePath;
private static Path dataTypesFilePath;
+ protected static MiniHS2 miniHS2 = null;
protected static HiveConf conf = null;
protected static Connection hs2Conn = null;
@@ -456,7 +457,7 @@ public abstract class BaseJdbcWithMiniLlap {
assertArrayEquals(new String[] {"val_0", "3"}, rowCollector.rows.get(0));
}
- private interface RowProcessor {
+ protected interface RowProcessor {
void process(Row row);
}
@@ -506,7 +507,8 @@ public abstract class BaseJdbcWithMiniLlap {
protected abstract InputFormat<NullWritable, Row> getInputFormat();
- private int processQuery(String currentDatabase, String query, int
numSplits, RowProcessor rowProcessor) throws Exception {
+ protected int processQuery(String currentDatabase, String query, int
numSplits, RowProcessor rowProcessor)
+ throws Exception {
String url = miniHS2.getJdbcURL();
String user = System.getProperty("user.name");
String pwd = user;
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
index f6f64b8..7eae613 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
@@ -16,125 +16,29 @@
package org.apache.hive.jdbc;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.net.URL;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.FieldDesc;
import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
import org.apache.hadoop.hive.llap.LlapInputSplit;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hive.jdbc.miniHS2.MiniHS2;
-import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
-import com.google.common.collect.Lists;
-
-public class TestJdbcGenericUDTFGetSplits {
- protected static MiniHS2 miniHS2 = null;
- protected static String dataFileDir;
- static Path kvDataFilePath;
- protected static String tableName = "testtab1";
-
- protected static HiveConf conf = null;
- protected Connection hs2Conn = null;
-
- @BeforeClass
- public static void beforeTest() throws Exception {
- Class.forName(MiniHS2.getJdbcDriverName());
-
- String confDir = "../../data/conf/llap/";
- HiveConf.setHiveSiteLocation(new URL("file://" + new
File(confDir).toURI().getPath() + "/hive-site.xml"));
- System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
-
- conf = new HiveConf();
- conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
- conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
- conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default");
- conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100,
TimeUnit.MILLISECONDS);
- conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS,
true);
- conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true);
- conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
- conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none");
- conf.setVar(ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT,
"text");
-
-
- conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
- + "/tez-site.xml"));
-
- miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
- dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:",
"");
- kvDataFilePath = new Path(dataFileDir, "kv1.txt");
-
- Map<String, String> confOverlay = new HashMap<>();
- miniHS2.start(confOverlay);
- miniHS2.getDFS().getFileSystem().mkdirs(new
Path("/apps_staging_dir/anonymous"));
- }
-
- @Before
- public void setUp() throws Exception {
- hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
System.getProperty("user.name"), "bar");
- }
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.UUID;
- @After
- public void tearDown() throws Exception {
- LlapBaseInputFormat.closeAll();
- hs2Conn.close();
- }
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
- @AfterClass
- public static void afterTest() throws Exception {
- if (miniHS2.isStarted()) {
- miniHS2.stop();
- }
- }
+/**
+ * TestJdbcGenericUDTFGetSplits.
+ */
+public class TestJdbcGenericUDTFGetSplits extends
AbstractTestJdbcGenericUDTFGetSplits {
@Test(timeout = 200000)
public void testGenericUDTFOrderBySplitCount1() throws Exception {
- String query = "select get_splits(" + "'select value from " + tableName +
"', 5)";
- runQuery(query, getConfigs(), 10);
-
- query = "select get_splits(" + "'select value from " + tableName + " order
by under_col', 5)";
- runQuery(query, getConfigs(), 1);
-
- query = "select get_splits(" + "'select value from " + tableName + " order
by under_col limit 0', 5)";
- runQuery(query, getConfigs(), 0);
-
- query = "select get_splits(" +
- "'select `value` from (select value from " + tableName + " where value
is not null order by value) as t', 5)";
- runQuery(query, getConfigs(), 1);
-
- List<String> setCmds = getConfigs();
- setCmds.add("set
hive.llap.external.splits.order.by.force.single.split=false");
- query = "select get_splits(" +
- "'select `value` from (select value from " + tableName + " where value
is not null order by value) as t', 5)";
- runQuery(query, setCmds, 10);
+ super.testGenericUDTFOrderBySplitCount1("get_splits", new int[]{10, 1, 0,
1, 10});
}
@Test
@@ -174,54 +78,4 @@ public class TestJdbcGenericUDTFGetSplits {
}
}
-
- private void runQuery(final String query, final List<String> setCmds,
- final int numRows) throws Exception {
-
- Connection con = hs2Conn;
- BaseJdbcWithMiniLlap.createTestTable(con, null, tableName,
kvDataFilePath.toString());
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- System.setErr(new PrintStream(baos)); // capture stderr
- final Statement selStmt = con.createStatement();
- Throwable throwable = null;
- int rowCount = 0;
- try {
- try {
- if (setCmds != null) {
- for (String setCmd : setCmds) {
- selStmt.execute(setCmd);
- }
- }
- ResultSet resultSet = selStmt.executeQuery(query);
- while(resultSet.next()) {
- rowCount++;
- }
- } catch (SQLException e) {
- throwable = e;
- }
- selStmt.close();
- assertNull(throwable);
- System.out.println("Expected " + numRows + " rows for query '" + query +
"'. Got: " + rowCount);
- assertEquals("Expected rows: " + numRows + " got: " + rowCount, numRows,
rowCount);
- } finally {
- baos.close();
- }
-
- }
-
- List<String> getConfigs(String... more) {
- List<String> setCmds = new ArrayList<>();
- setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict");
- setCmds.add("set mapred.min.split.size=10");
- setCmds.add("set mapred.max.split.size=10");
- setCmds.add("set tez.grouping.min-size=10");
- setCmds.add("set tez.grouping.max-size=10");
- // to get at least 10 splits
- setCmds.add("set tez.grouping.split-waves=10");
- if (more != null) {
- setCmds.addAll(Arrays.asList(more));
- }
- return setCmds;
- }
}
\ No newline at end of file
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java
new file mode 100644
index 0000000..3301745
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import org.junit.Test;
+
+/**
+ * TestJdbcGenericUDTFGetSplits2.
+ */
+public class TestJdbcGenericUDTFGetSplits2 extends
AbstractTestJdbcGenericUDTFGetSplits {
+
+ @Test(timeout = 200000)
+ public void testGenericUDTFOrderBySplitCount1() throws Exception {
+ super.testGenericUDTFOrderBySplitCount1("get_llap_splits", new int[]{12,
3, 1, 3, 12});
+ }
+
+}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java
new file mode 100644
index 0000000..e2884d1
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hive.jdbc;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hadoop.hive.llap.LlapInputSplit;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.junit.BeforeClass;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TestNewGetSplitsFormat.
+ */
+public class TestNewGetSplitsFormat extends BaseJdbcWithMiniLlap {
+
+ @BeforeClass public static void beforeTest() throws Exception {
+ HiveConf conf = defaultConf();
+ conf.setBoolVar(HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+
conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED,
true);
+ BaseJdbcWithMiniLlap.beforeTest(conf);
+ }
+
+ @Override protected InputFormat<NullWritable, Row> getInputFormat() {
+ //For unit testing, no harm in hard-coding allocator ceiling to
LONG.MAX_VALUE
+ return new LlapArrowRowInputFormat(Long.MAX_VALUE);
+ }
+
+ @Override public void testDataTypes() throws Exception {
+ TestJdbcWithMiniLlapVectorArrow testJdbcWithMiniLlapVectorArrow = new
TestJdbcWithMiniLlapVectorArrow();
+ testJdbcWithMiniLlapVectorArrow.testDataTypes();
+ }
+
+ @Override protected int processQuery(String currentDatabase, String query,
int numSplits, RowProcessor rowProcessor)
+ throws Exception {
+ String url = miniHS2.getJdbcURL();
+ String user = System.getProperty("user.name");
+ String pwd = user;
+ String handleId = UUID.randomUUID().toString();
+
+ InputFormat<NullWritable, Row> inputFormat = getInputFormat();
+
+ // Get splits
+ JobConf job = new JobConf(conf);
+ job.set(LlapBaseInputFormat.URL_KEY, url);
+ job.set(LlapBaseInputFormat.USER_KEY, user);
+ job.set(LlapBaseInputFormat.PWD_KEY, pwd);
+ job.set(LlapBaseInputFormat.QUERY_KEY, query);
+ job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
+ job.set(LlapBaseInputFormat.USE_NEW_SPLIT_FORMAT, "true");
+ if (currentDatabase != null) {
+ job.set(LlapBaseInputFormat.DB_KEY, currentDatabase);
+ }
+
+ InputSplit[] splits = inputFormat.getSplits(job, numSplits);
+ assertTrue(splits.length > 2);
+
+ // populate actual splits with schema and planBytes[]
+ LlapInputSplit schemaSplit = (LlapInputSplit) splits[0];
+ LlapInputSplit planSplit = (LlapInputSplit) splits[1];
+
+ List<LlapInputSplit> actualSplits = new ArrayList<>();
+
+ for (int i = 2; i < splits.length; i++) {
+ LlapInputSplit actualSplit = (LlapInputSplit) splits[i];
+ actualSplit.setSchema(schemaSplit.getSchema());
+ actualSplit.setPlanBytes(planSplit.getPlanBytes());
+ actualSplits.add(actualSplit);
+ }
+
+ // Fetch rows from splits
+ int rowCount = 0;
+ for (InputSplit split : actualSplits) {
+ System.out.println("Processing split " + split.getLocations());
+ RecordReader<NullWritable, Row> reader =
inputFormat.getRecordReader(split, job, null);
+ Row row = reader.createValue();
+ while (reader.next(NullWritable.get(), row)) {
+ rowProcessor.process(row);
+ ++rowCount;
+ }
+ //In arrow-mode this will throw exception unless all buffers have been
released
+ //See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader
+ reader.close();
+ }
+ LlapBaseInputFormat.close(handleId);
+
+ return rowCount;
+ }
+
+}
diff --git
a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
index 2896651..068a913 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
@@ -91,6 +91,14 @@ public class LlapInputSplit implements
InputSplitWithLocationInfo {
return tokenBytes;
}
+ public void setPlanBytes(byte[] planBytes) {
+ this.planBytes = planBytes;
+ }
+
+ public void setSchema(Schema schema) {
+ this.schema = schema;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(splitNum);
diff --git
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 2aa82b5..5c99655 100644
---
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
@@ -116,10 +115,11 @@ public class LlapBaseInputFormat<V extends
WritableComparable<?>>
public static final String PWD_KEY = "llap.if.pwd";
public static final String HANDLE_ID = "llap.if.handleid";
public static final String DB_KEY = "llap.if.database";
+ public static final String USE_NEW_SPLIT_FORMAT =
"llap.if.use.new.split.format";
public static final String SESSION_QUERIES_FOR_GET_NUM_SPLITS =
"llap.session.queries.for.get.num.splits";
public static final Pattern SET_QUERY_PATTERN =
Pattern.compile("^\\s*set\\s+.*=.+$", Pattern.CASE_INSENSITIVE);
- public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
+ public static final String SPLIT_QUERY = "select get_llap_splits(\"%s\",%d)";
public static final LlapServiceInstance[] serviceInstanceArray = new
LlapServiceInstance[0];
public LlapBaseInputFormat(String url, String user, String pwd, String
query) {
@@ -281,13 +281,43 @@ public class LlapBaseInputFormat<V extends
WritableComparable<?>>
}
}
+ // In case of USE_NEW_SPLIT_FORMAT=true, following format is used
+ // type split
+ // schema-split LlapInputSplit -- contains only schema
+ // plan-split LlapInputSplit -- contains only planBytes[]
+ // 0 LlapInputSplit -- actual split 1
+ // 1 LlapInputSplit -- actual split 2
+ // ... ...
+ boolean useNewSplitFormat = job.getBoolean(USE_NEW_SPLIT_FORMAT,
false);
+
ResultSet res = stmt.executeQuery(sql);
+ int count = 0;
+ LlapInputSplit schemaSplit = null;
+ LlapInputSplit planSplit = null;
while (res.next()) {
// deserialize split
- DataInput in = new DataInputStream(res.getBinaryStream(1));
- InputSplitWithLocationInfo is = new LlapInputSplit();
+ DataInput in = new DataInputStream(res.getBinaryStream(2));
+ LlapInputSplit is = new LlapInputSplit();
is.readFields(in);
- ins.add(is);
+ if (useNewSplitFormat) {
+ ins.add(is);
+ } else {
+ // to keep the old format, populate schema and planBytes[] in
actual splits
+ if (count == 0) {
+ schemaSplit = is;
+ if (numSplits == 0) {
+ ins.add(schemaSplit);
+ }
+ } else if (count == 1) {
+ planSplit = is;
+ } else {
+ is.setSchema(schemaSplit.getSchema());
+ assert planSplit != null;
+ is.setPlanBytes(planSplit.getPlanBytes());
+ ins.add(is);
+ }
+ count++;
+ }
}
res.close();
} catch (Exception e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index 62d082d..b0d7a4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -538,6 +538,7 @@ public final class FunctionRegistry {
system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class);
system.registerGenericUDTF("stack", GenericUDTFStack.class);
system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class);
+ system.registerGenericUDTF("get_llap_splits", GenericUDTFGetSplits2.class);
system.registerGenericUDTF("get_sql_schema",
GenericUDTFGetSQLSchema.class);
//PTF declarations
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 5c760e8..b6974fa 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -135,14 +135,30 @@ public class GenericUDTFGetSplits extends GenericUDTF {
protected transient JobConf jc;
private boolean orderByQuery;
private boolean forceSingleSplit;
- private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
- private DataOutput dos = new DataOutputStream(bos);
+ protected ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
+ protected DataOutput dos = new DataOutputStream(bos);
+ protected String inputArgQuery;
+ protected int inputArgNumSplits;
+ protected boolean schemaSplitOnly;
@Override
public StructObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
-
LOG.debug("initializing GenericUDFGetSplits");
+ validateInput(arguments);
+
+ List<String> names = Arrays.asList("split");
+ List<ObjectInspector> fieldOIs = Arrays
+ .<ObjectInspector>
asList(PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
+ StructObjectInspector outputOI = ObjectInspectorFactory
+ .getStandardStructObjectInspector(names, fieldOIs);
+
+ LOG.debug("done initializing GenericUDFGetSplits");
+ return outputOI;
+ }
+
+ protected void validateInput(ObjectInspector[] arguments)
+ throws UDFArgumentLengthException, UDFArgumentTypeException {
if (SessionState.get() == null || SessionState.get().getConf() == null) {
throw new IllegalStateException("Cannot run get splits outside HS2");
@@ -167,15 +183,6 @@ public class GenericUDTFGetSplits extends GenericUDTF {
stringOI = (StringObjectInspector) arguments[0];
intOI = (IntObjectInspector) arguments[1];
-
- List<String> names = Arrays.asList("split");
- List<ObjectInspector> fieldOIs = Arrays
- .<ObjectInspector>
asList(PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
- StructObjectInspector outputOI = ObjectInspectorFactory
- .getStandardStructObjectInspector(names, fieldOIs);
-
- LOG.debug("done initializing GenericUDFGetSplits");
- return outputOI;
}
public static class PlanFragment {
@@ -192,9 +199,31 @@ public class GenericUDTFGetSplits extends GenericUDTF {
@Override
public void process(Object[] arguments) throws HiveException {
+ initArgs(arguments);
+ try {
+ SplitResult splitResult = getSplitResult(false);
+ InputSplit[] splits = schemaSplitOnly ? new
InputSplit[]{splitResult.schemaSplit} : splitResult.actualSplits;
+ for (InputSplit s : splits) {
+ Object[] os = new Object[1];
+ bos.reset();
+ s.write(dos);
+ byte[] frozen = bos.toByteArray();
+ os[0] = frozen;
+ forward(os);
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
- String query = stringOI.getPrimitiveJavaObject(arguments[0]);
- int num = intOI.get(arguments[1]);
+ protected void initArgs(Object[] arguments) {
+ inputArgQuery = stringOI.getPrimitiveJavaObject(arguments[0]);
+ inputArgNumSplits = intOI.get(arguments[1]);
+ schemaSplitOnly = inputArgNumSplits == 0;
+ }
+
+ protected SplitResult getSplitResult(boolean generateLightWeightSplits)
+ throws HiveException, IOException {
// Generate applicationId for the LLAP splits
LlapCoordinator coordinator = LlapCoordinator.getInstance();
@@ -205,32 +234,36 @@ public class GenericUDTFGetSplits extends GenericUDTF {
ApplicationId applicationId = coordinator.createExtClientAppId();
LOG.info("Generated appID {} for LLAP splits", applicationId.toString());
- PlanFragment fragment = createPlanFragment(query, num, applicationId);
+ PlanFragment fragment = createPlanFragment(inputArgQuery, applicationId);
TezWork tezWork = fragment.work;
Schema schema = fragment.schema;
boolean generateSingleSplit = forceSingleSplit && orderByQuery;
- try {
- InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId,
generateSingleSplit);
- LOG.info("Generated {} splits for query {}. orderByQuery: {}
forceSingleSplit: {}", splits.length, query,
- orderByQuery, forceSingleSplit);
- if (generateSingleSplit && splits.length > 1) {
- throw new HiveException("Got more than one split (Got: " +
splits.length + ") for order by query: " + query);
+
+ SplitResult splitResult = getSplits(jc, tezWork, schema, applicationId,
generateSingleSplit,
+ generateLightWeightSplits);
+ validateSplitResult(splitResult, generateLightWeightSplits,
generateSingleSplit);
+ return splitResult;
+ }
+
+ private void validateSplitResult(SplitResult splitResult, boolean
generateLightWeightSplits,
+ boolean generateSingleSplit) throws
HiveException {
+ Preconditions.checkNotNull(splitResult.schemaSplit, "schema split cannot
be null");
+ if (!schemaSplitOnly) {
+ InputSplit[] splits = splitResult.actualSplits;
+ if (splits.length > 0 && generateLightWeightSplits) {
+ Preconditions.checkNotNull(splitResult.planSplit, "plan split cannot
be null");
}
- for (InputSplit s : splits) {
- Object[] os = new Object[1];
- bos.reset();
- s.write(dos);
- byte[] frozen = bos.toByteArray();
- os[0] = frozen;
- forward(os);
+ LOG.info("Generated {} splits for query {}. orderByQuery: {}
forceSingleSplit: {}", splits.length, inputArgQuery,
+ orderByQuery, forceSingleSplit);
+ if (generateSingleSplit && splits.length > 1) {
+ throw new HiveException("Got more than one split (Got: " +
splits.length
+ + ") for order by query: " + inputArgQuery);
}
- } catch (Exception e) {
- throw new HiveException(e);
}
}
- public PlanFragment createPlanFragment(String query, int num, ApplicationId
splitsAppId)
+ private PlanFragment createPlanFragment(String query, ApplicationId
splitsAppId)
throws HiveException {
HiveConf conf = new HiveConf(SessionState.get().getConf());
@@ -250,7 +283,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
// hive compiler is going to remove inner order by. disable that
optimization until then.
HiveConf.setBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY, false);
- if(num == 0) {
+ if (schemaSplitOnly) {
//Schema only
try {
List<FieldSchema> fieldSchemas =
ParseUtils.parseQueryAndGetSchema(conf, query);
@@ -378,16 +411,21 @@ public class GenericUDTFGetSplits extends GenericUDTF {
}
}
- public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work,
Schema schema, ApplicationId applicationId,
- final boolean generateSingleSplit)
- throws IOException {
- if(numSplits == 0) {
- //Schema only
- LlapInputSplit schemaSplit = new LlapInputSplit(
- 0, new byte[0], new byte[0], new byte[0],
- new SplitLocationInfo[0], schema, "", new byte[0]);
- return new InputSplit[] { schemaSplit };
+ // generateLightWeightSplits - if true then
+ // 1) schema and planBytes[] in each LlapInputSplit are not populated
+ // 2) schemaSplit(contains only schema) and planSplit(contains only
planBytes[]) are populated in SplitResult
+ private SplitResult getSplits(JobConf job, TezWork work, Schema schema,
ApplicationId applicationId,
+ final boolean generateSingleSplit, boolean
generateLightWeightSplits)
+ throws IOException {
+
+ SplitResult splitResult = new SplitResult();
+ splitResult.schemaSplit = new LlapInputSplit(
+ 0, new byte[0], new byte[0], new byte[0],
+ new SplitLocationInfo[0], schema, "", new byte[0]);
+ if (schemaSplitOnly) {
+ // schema only
+ return splitResult;
}
DAG dag = DAG.create(work.getName());
@@ -429,14 +467,15 @@ public class GenericUDTFGetSplits extends GenericUDTF {
HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf,
mapWork, generateSingleSplit);
List<Event> eventList = splitGenerator.initialize();
- InputSplit[] result = new InputSplit[eventList.size() - 1];
+ int numGroupedSplitsGenerated = eventList.size() - 1;
+ InputSplit[] result = new InputSplit[numGroupedSplitsGenerated];
InputConfigureVertexTasksEvent configureEvent
= (InputConfigureVertexTasksEvent) eventList.get(0);
List<TaskLocationHint> hints =
configureEvent.getLocationHint().getTaskLocationHints();
- Preconditions.checkState(hints.size() == eventList.size() - 1);
+ Preconditions.checkState(hints.size() == numGroupedSplitsGenerated);
if (LOG.isDebugEnabled()) {
LOG.debug("NumEvents=" + eventList.size() + ", NumSplits=" +
result.length);
@@ -472,11 +511,14 @@ public class GenericUDTFGetSplits extends GenericUDTF {
// Generate umbilical token (applies to all splits)
Token<JobTokenIdentifier> umbilicalToken =
JobTokenCreator.createJobToken(applicationId);
- LOG.info("Number of splits: " + (eventList.size() - 1));
+ LOG.info("Number of splits: " + numGroupedSplitsGenerated);
SignedMessage signedSvs = null;
- for (int i = 0; i < eventList.size() - 1; i++) {
+ byte[] submitWorkBytes = null;
+ final byte[] emptySubmitWorkBytes = new byte[0];
+ final Schema emptySchema = new Schema();
+ for (int i = 0; i < numGroupedSplitsGenerated; i++) {
TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag,
vertexName,
- eventList.size() - 1, applicationId, i);
+ numGroupedSplitsGenerated, applicationId, i);
// 2. Generate the vertex/submit information for all events.
if (i == 0) {
@@ -488,28 +530,44 @@ public class GenericUDTFGetSplits extends GenericUDTF {
// Despite the differences in TaskSpec, the vertex spec should be
the same.
signedSvs = createSignedVertexSpec(signer, taskSpec, applicationId,
queryUser,
applicationId.toString());
+ SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId,
+ System.currentTimeMillis(), numGroupedSplitsGenerated,
signedSvs.message,
+ signedSvs.signature, umbilicalToken);
+ submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
+ if (generateLightWeightSplits) {
+ splitResult.planSplit = new LlapInputSplit(
+ 0, submitWorkBytes, new byte[0], new byte[0],
+ new SplitLocationInfo[0], new Schema(), "", new byte[0]);
+ }
}
- SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId,
- System.currentTimeMillis(), taskSpec.getVertexParallelism(),
signedSvs.message,
- signedSvs.signature, umbilicalToken);
- byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
-
// 3. Generate input event.
SignedMessage eventBytes = makeEventBytes(wx, vertexName,
eventList.get(i + 1), signer);
// 4. Make location hints.
SplitLocationInfo[] locations = makeLocationHints(hints.get(i));
- result[i] = new LlapInputSplit(i, submitWorkBytes, eventBytes.message,
- eventBytes.signature, locations, schema, llapUser, tokenBytes);
- }
- return result;
+ if (generateLightWeightSplits) {
+ result[i] = new LlapInputSplit(i, emptySubmitWorkBytes,
eventBytes.message,
+ eventBytes.signature, locations, emptySchema, llapUser,
tokenBytes);
+ } else {
+ result[i] = new LlapInputSplit(i, submitWorkBytes,
eventBytes.message,
+ eventBytes.signature, locations, schema, llapUser, tokenBytes);
+ }
+ }
+ splitResult.actualSplits = result;
+ return splitResult;
} catch (Exception e) {
throw new IOException(e);
}
}
+ static class SplitResult {
+ InputSplit schemaSplit;
+ InputSplit planSplit;
+ InputSplit[] actualSplits;
+ }
+
private static class DriverCleanup implements Closeable {
private final Driver driver;
private final HiveTxnManager txnManager;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits2.java
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits2.java
new file mode 100644
index 0000000..ae53ab7
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits2.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.mapred.InputSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * GenericUDTFGetSplits2 - Memory efficient version of GenericUDTFGetSplits.
+ * It separates out information like schema and planBytes[] which is common to
all the splits.
+ * This produces output in following format.
+ * <p>
+ * type split
+ * ----------------------------------------------------
+ * schema-split LlapInputSplit -- contains only schema
+ * plan-split LlapInputSplit -- contains only planBytes[]
+ * 0 LlapInputSplit -- actual split 1
+ * 1 LlapInputSplit -- actual split 2
+ * ... ...
+ */
+@Description(name = "get_llap_splits", value = "_FUNC_(string,int) - "
+ + "Returns an array of length int serialized splits for the referenced
tables string."
+ + " Passing length 0 returns only schema data for the compiled query. "
+ + "The order of splits is: schema-split, plan-split, 0, 1, 2...where 0, 1,
2...are the actual splits "
+ + "This UDTF is for internal use by LlapBaseInputFormat and not to be
invoked explicitly")
+@UDFType(deterministic = false)
+public class GenericUDTFGetSplits2 extends GenericUDTFGetSplits {
+ private static final Logger LOG =
LoggerFactory.getLogger(GenericUDTFGetSplits2.class);
+
+ @Override public StructObjectInspector initialize(ObjectInspector[]
arguments) throws UDFArgumentException {
+ LOG.debug("initializing GenericUDFGetSplits2");
+ validateInput(arguments);
+
+ List<String> names = Arrays.asList("type", "split");
+ List<ObjectInspector> fieldOIs =
Arrays.asList(PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
+ StructObjectInspector outputOI =
ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
+
+ LOG.debug("done initializing GenericUDFGetSplits2");
+ return outputOI;
+ }
+
+ @Override public void process(Object[] arguments) throws HiveException {
+ try {
+ initArgs(arguments);
+ SplitResult splitResult = getSplitResult(true);
+ forwardOutput(splitResult);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ private void forwardOutput(SplitResult splitResult) throws IOException,
HiveException {
+ for (Map.Entry<String, InputSplit> entry :
transformSplitResult(splitResult).entrySet()) {
+ Object[] os = new Object[2];
+ os[0] = entry.getKey();
+ InputSplit split = entry.getValue();
+ bos.reset();
+ split.write(dos);
+ os[1] = bos.toByteArray();
+ forward(os);
+ }
+ }
+
+ private Map<String, InputSplit> transformSplitResult(SplitResult
splitResult) {
+ Map<String, InputSplit> splitMap = new LinkedHashMap<>();
+ splitMap.put("schema-split", splitResult.schemaSplit);
+ if (splitResult.actualSplits != null && splitResult.actualSplits.length >
0) {
+ Preconditions.checkNotNull(splitResult.planSplit);
+ splitMap.put("plan-split", splitResult.planSplit);
+ for (int i = 0; i < splitResult.actualSplits.length; i++) {
+ splitMap.put("" + i, splitResult.actualSplits[i]);
+ }
+ }
+ return splitMap;
+ }
+}
diff --git a/ql/src/test/results/clientpositive/show_functions.q.out
b/ql/src/test/results/clientpositive/show_functions.q.out
index 04a0fb4..d88a5f2 100644
--- a/ql/src/test/results/clientpositive/show_functions.q.out
+++ b/ql/src/test/results/clientpositive/show_functions.q.out
@@ -129,6 +129,7 @@ format_number
from_unixtime
from_utc_timestamp
get_json_object
+get_llap_splits
get_splits
get_sql_schema
greatest
@@ -562,6 +563,7 @@ format_number
from_unixtime
from_utc_timestamp
get_json_object
+get_llap_splits
get_splits
get_sql_schema
greatest