This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d6668c7  HIVE-22221: Llap external client - Need to reduce 
LlapBaseInputFormat#getSplits() footprint (Shubham Chaurasia, reviewed by Jason 
Dere)
d6668c7 is described below

commit d6668c705da3b4afb4ae3e113e21ae5f0c53b8ae
Author: Shubham Chaurasia <[email protected]>
AuthorDate: Wed Sep 25 13:32:00 2019 +0530

    HIVE-22221: Llap external client - Need to reduce 
LlapBaseInputFormat#getSplits() footprint (Shubham Chaurasia, reviewed by Jason 
Dere)
    
    Signed-off-by: Sankar Hariappan <[email protected]>
---
 ...a => AbstractTestJdbcGenericUDTFGetSplits.java} | 167 ++++++++------------
 .../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java |   8 +-
 .../hive/jdbc/TestJdbcGenericUDTFGetSplits.java    | 168 ++-------------------
 .../hive/jdbc/TestJdbcGenericUDTFGetSplits2.java   |  31 ++++
 .../apache/hive/jdbc/TestNewGetSplitsFormat.java   | 116 ++++++++++++++
 .../apache/hadoop/hive/llap/LlapInputSplit.java    |   8 +
 .../hadoop/hive/llap/LlapBaseInputFormat.java      |  40 ++++-
 .../hadoop/hive/ql/exec/FunctionRegistry.java      |   1 +
 .../hive/ql/udf/generic/GenericUDTFGetSplits.java  | 168 ++++++++++++++-------
 .../hive/ql/udf/generic/GenericUDTFGetSplits2.java | 109 +++++++++++++
 .../results/clientpositive/show_functions.q.out    |   2 +
 11 files changed, 490 insertions(+), 328 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java
similarity index 55%
copy from 
itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
copy to 
itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java
index f6f64b8..02dfe7c 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTestJdbcGenericUDTFGetSplits.java
@@ -16,10 +16,14 @@
 
 package org.apache.hive.jdbc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -34,34 +38,20 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.FieldDesc;
-import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
-import org.apache.hadoop.hive.llap.LlapInputSplit;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hive.jdbc.miniHS2.MiniHS2;
-import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
-public class TestJdbcGenericUDTFGetSplits {
+/**
+ * AbstractTestJdbcGenericUDTFGetSplits.
+ */
+public abstract class AbstractTestJdbcGenericUDTFGetSplits {
   protected static MiniHS2 miniHS2 = null;
   protected static String dataFileDir;
-  static Path kvDataFilePath;
   protected static String tableName = "testtab1";
-
   protected static HiveConf conf = null;
+  static Path kvDataFilePath;
   protected Connection hs2Conn = null;
 
   @BeforeClass
@@ -74,21 +64,21 @@ public class TestJdbcGenericUDTFGetSplits {
 
     conf = new HiveConf();
     conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
-    conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-    conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
-    conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default");
-    conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, 
TimeUnit.MILLISECONDS);
-    conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, 
true);
-    conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true);
-    conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
-    conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none");
-    conf.setVar(ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT, 
"text");
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+    conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default");
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, 
TimeUnit.MILLISECONDS);
+    
conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, 
true);
+    conf.setBoolVar(HiveConf.ConfVars.TEZ_EXEC_SUMMARY, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
+    conf.setVar(HiveConf.ConfVars.LLAP_IO_MEMORY_MODE, "none");
+    
conf.setVar(HiveConf.ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT, 
"text");
 
 
     conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
-      + "/tez-site.xml"));
+        + "/tez-site.xml"));
 
-    miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
+    miniHS2 = new MiniHS2(conf, MiniHS2.MiniClusterType.LLAP);
     dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", 
"");
     kvDataFilePath = new Path(dataFileDir, "kv1.txt");
 
@@ -97,17 +87,6 @@ public class TestJdbcGenericUDTFGetSplits {
     miniHS2.getDFS().getFileSystem().mkdirs(new 
Path("/apps_staging_dir/anonymous"));
   }
 
-  @Before
-  public void setUp() throws Exception {
-    hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), 
System.getProperty("user.name"), "bar");
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    LlapBaseInputFormat.closeAll();
-    hs2Conn.close();
-  }
-
   @AfterClass
   public static void afterTest() throws Exception {
     if (miniHS2.isStarted()) {
@@ -115,68 +94,19 @@ public class TestJdbcGenericUDTFGetSplits {
     }
   }
 
-  @Test(timeout = 200000)
-  public void testGenericUDTFOrderBySplitCount1() throws Exception {
-    String query = "select get_splits(" + "'select value from " + tableName + 
"', 5)";
-    runQuery(query, getConfigs(), 10);
-
-    query = "select get_splits(" + "'select value from " + tableName + " order 
by under_col', 5)";
-    runQuery(query, getConfigs(), 1);
-
-    query = "select get_splits(" + "'select value from " + tableName + " order 
by under_col limit 0', 5)";
-    runQuery(query, getConfigs(), 0);
-
-    query = "select get_splits(" +
-      "'select `value` from (select value from " + tableName + " where value 
is not null order by value) as t', 5)";
-    runQuery(query, getConfigs(), 1);
-
-    List<String> setCmds = getConfigs();
-    setCmds.add("set 
hive.llap.external.splits.order.by.force.single.split=false");
-    query = "select get_splits(" +
-      "'select `value` from (select value from " + tableName + " where value 
is not null order by value) as t', 5)";
-    runQuery(query, setCmds, 10);
+  @Before
+  public void setUp() throws Exception {
+    hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), 
System.getProperty("user.name"), "bar");
   }
 
-  @Test
-  public void testDecimalPrecisionAndScale() throws Exception {
-    try (Statement stmt = hs2Conn.createStatement()) {
-      stmt.execute("CREATE TABLE decimal_test_table(decimal_col 
DECIMAL(6,2))");
-      stmt.execute("INSERT INTO decimal_test_table VALUES(2507.92)");
-
-      ResultSet rs = stmt.executeQuery("SELECT * FROM decimal_test_table");
-      assertTrue(rs.next());
-      rs.close();
-
-      String url = miniHS2.getJdbcURL();
-      String user = System.getProperty("user.name");
-      String pwd = user;
-      String handleId = UUID.randomUUID().toString();
-      String sql = "SELECT avg(decimal_col)/3 FROM decimal_test_table";
-
-      // make request through llap-ext-client
-      JobConf job = new JobConf(conf);
-      job.set(LlapBaseInputFormat.URL_KEY, url);
-      job.set(LlapBaseInputFormat.USER_KEY, user);
-      job.set(LlapBaseInputFormat.PWD_KEY, pwd);
-      job.set(LlapBaseInputFormat.QUERY_KEY, sql);
-      job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
-
-      LlapBaseInputFormat llapBaseInputFormat = new LlapBaseInputFormat();
-      //schema split
-      LlapInputSplit schemaSplit = (LlapInputSplit) 
llapBaseInputFormat.getSplits(job, 0)[0];
-      assertNotNull(schemaSplit);
-      FieldDesc fieldDesc = schemaSplit.getSchema().getColumns().get(0);
-      DecimalTypeInfo type = (DecimalTypeInfo) fieldDesc.getTypeInfo();
-      assertEquals(38, type.getPrecision());
-      assertEquals(24, type.scale());
-
-      LlapBaseInputFormat.close(handleId);
-    }
+  @After
+  public void tearDown() throws Exception {
+    LlapBaseInputFormat.closeAll();
+    hs2Conn.close();
   }
 
-
-  private void runQuery(final String query, final List<String> setCmds,
-    final int numRows) throws Exception {
+  protected void runQuery(final String query, final List<String> setCmds,
+                          final int numRows) throws Exception {
 
     Connection con = hs2Conn;
     BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, 
kvDataFilePath.toString());
@@ -194,7 +124,7 @@ public class TestJdbcGenericUDTFGetSplits {
           }
         }
         ResultSet resultSet = selStmt.executeQuery(query);
-        while(resultSet.next()) {
+        while (resultSet.next()) {
           rowCount++;
         }
       } catch (SQLException e) {
@@ -210,7 +140,7 @@ public class TestJdbcGenericUDTFGetSplits {
 
   }
 
-  List<String> getConfigs(String... more) {
+  protected List<String> getConfigs(String... more) {
     List<String> setCmds = new ArrayList<>();
     setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict");
     setCmds.add("set mapred.min.split.size=10");
@@ -224,4 +154,25 @@ public class TestJdbcGenericUDTFGetSplits {
     }
     return setCmds;
   }
-}
\ No newline at end of file
+
+  protected void testGenericUDTFOrderBySplitCount1(String udtfName, int[] 
expectedCounts) throws Exception {
+    String query = "select " + udtfName + "(" + "'select value from " + 
tableName + "', 5)";
+    runQuery(query, getConfigs(), expectedCounts[0]);
+
+    query = "select " + udtfName + "(" + "'select value from " + tableName + " 
order by under_col', 5)";
+    runQuery(query, getConfigs(), expectedCounts[1]);
+
+    query = "select " + udtfName + "(" + "'select value from " + tableName + " 
order by under_col limit 0', 5)";
+    runQuery(query, getConfigs(), expectedCounts[2]);
+
+    query = "select " + udtfName + "(" +
+        "'select `value` from (select value from " + tableName + " where value 
is not null order by value) as t', 5)";
+    runQuery(query, getConfigs(), expectedCounts[3]);
+
+    List<String> setCmds = getConfigs();
+    setCmds.add("set 
hive.llap.external.splits.order.by.force.single.split=false");
+    query = "select " + udtfName + "(" +
+        "'select `value` from (select value from " + tableName + " where value 
is not null order by value) as t', 5)";
+    runQuery(query, setCmds, expectedCounts[4]);
+  }
+}
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 8467cea..f4e9f9a 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -101,11 +101,12 @@ import org.apache.hadoop.mapred.InputFormat;
  * by sub-classes in a {@link org.junit.BeforeClass} initializer
  */
 public abstract class BaseJdbcWithMiniLlap {
-  private static MiniHS2 miniHS2 = null;
+
   private static String dataFileDir;
   private static Path kvDataFilePath;
   private static Path dataTypesFilePath;
 
+  protected static MiniHS2 miniHS2 = null;
   protected static HiveConf conf = null;
   protected static Connection hs2Conn = null;
 
@@ -456,7 +457,7 @@ public abstract class BaseJdbcWithMiniLlap {
     assertArrayEquals(new String[] {"val_0", "3"}, rowCollector.rows.get(0));
   }
 
-  private interface RowProcessor {
+  protected interface RowProcessor {
     void process(Row row);
   }
 
@@ -506,7 +507,8 @@ public abstract class BaseJdbcWithMiniLlap {
 
   protected abstract InputFormat<NullWritable, Row> getInputFormat();
 
-  private int processQuery(String currentDatabase, String query, int 
numSplits, RowProcessor rowProcessor) throws Exception {
+  protected int processQuery(String currentDatabase, String query, int 
numSplits, RowProcessor rowProcessor)
+      throws Exception {
     String url = miniHS2.getJdbcURL();
     String user = System.getProperty("user.name");
     String pwd = user;
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
index f6f64b8..7eae613 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java
@@ -16,125 +16,29 @@
 
 package org.apache.hive.jdbc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.net.URL;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.FieldDesc;
 import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
 import org.apache.hadoop.hive.llap.LlapInputSplit;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hive.jdbc.miniHS2.MiniHS2;
-import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-
-public class TestJdbcGenericUDTFGetSplits {
-  protected static MiniHS2 miniHS2 = null;
-  protected static String dataFileDir;
-  static Path kvDataFilePath;
-  protected static String tableName = "testtab1";
-
-  protected static HiveConf conf = null;
-  protected Connection hs2Conn = null;
-
-  @BeforeClass
-  public static void beforeTest() throws Exception {
-    Class.forName(MiniHS2.getJdbcDriverName());
-
-    String confDir = "../../data/conf/llap/";
-    HiveConf.setHiveSiteLocation(new URL("file://" + new 
File(confDir).toURI().getPath() + "/hive-site.xml"));
-    System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
-
-    conf = new HiveConf();
-    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
-    conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-    conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
-    conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default");
-    conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, 
TimeUnit.MILLISECONDS);
-    conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, 
true);
-    conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true);
-    conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
-    conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none");
-    conf.setVar(ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT, 
"text");
-
-
-    conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
-      + "/tez-site.xml"));
-
-    miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
-    dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", 
"");
-    kvDataFilePath = new Path(dataFileDir, "kv1.txt");
-
-    Map<String, String> confOverlay = new HashMap<>();
-    miniHS2.start(confOverlay);
-    miniHS2.getDFS().getFileSystem().mkdirs(new 
Path("/apps_staging_dir/anonymous"));
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), 
System.getProperty("user.name"), "bar");
-  }
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.UUID;
 
-  @After
-  public void tearDown() throws Exception {
-    LlapBaseInputFormat.closeAll();
-    hs2Conn.close();
-  }
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
-  @AfterClass
-  public static void afterTest() throws Exception {
-    if (miniHS2.isStarted()) {
-      miniHS2.stop();
-    }
-  }
+/**
+ * TestJdbcGenericUDTFGetSplits.
+ */
+public class TestJdbcGenericUDTFGetSplits extends 
AbstractTestJdbcGenericUDTFGetSplits {
 
   @Test(timeout = 200000)
   public void testGenericUDTFOrderBySplitCount1() throws Exception {
-    String query = "select get_splits(" + "'select value from " + tableName + 
"', 5)";
-    runQuery(query, getConfigs(), 10);
-
-    query = "select get_splits(" + "'select value from " + tableName + " order 
by under_col', 5)";
-    runQuery(query, getConfigs(), 1);
-
-    query = "select get_splits(" + "'select value from " + tableName + " order 
by under_col limit 0', 5)";
-    runQuery(query, getConfigs(), 0);
-
-    query = "select get_splits(" +
-      "'select `value` from (select value from " + tableName + " where value 
is not null order by value) as t', 5)";
-    runQuery(query, getConfigs(), 1);
-
-    List<String> setCmds = getConfigs();
-    setCmds.add("set 
hive.llap.external.splits.order.by.force.single.split=false");
-    query = "select get_splits(" +
-      "'select `value` from (select value from " + tableName + " where value 
is not null order by value) as t', 5)";
-    runQuery(query, setCmds, 10);
+    super.testGenericUDTFOrderBySplitCount1("get_splits", new int[]{10, 1, 0, 
1, 10});
   }
 
   @Test
@@ -174,54 +78,4 @@ public class TestJdbcGenericUDTFGetSplits {
     }
   }
 
-
-  private void runQuery(final String query, final List<String> setCmds,
-    final int numRows) throws Exception {
-
-    Connection con = hs2Conn;
-    BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, 
kvDataFilePath.toString());
-
-    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    System.setErr(new PrintStream(baos)); // capture stderr
-    final Statement selStmt = con.createStatement();
-    Throwable throwable = null;
-    int rowCount = 0;
-    try {
-      try {
-        if (setCmds != null) {
-          for (String setCmd : setCmds) {
-            selStmt.execute(setCmd);
-          }
-        }
-        ResultSet resultSet = selStmt.executeQuery(query);
-        while(resultSet.next()) {
-          rowCount++;
-        }
-      } catch (SQLException e) {
-        throwable = e;
-      }
-      selStmt.close();
-      assertNull(throwable);
-      System.out.println("Expected " + numRows + " rows for query '" + query + 
"'. Got: " + rowCount);
-      assertEquals("Expected rows: " + numRows + " got: " + rowCount, numRows, 
rowCount);
-    } finally {
-      baos.close();
-    }
-
-  }
-
-  List<String> getConfigs(String... more) {
-    List<String> setCmds = new ArrayList<>();
-    setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict");
-    setCmds.add("set mapred.min.split.size=10");
-    setCmds.add("set mapred.max.split.size=10");
-    setCmds.add("set tez.grouping.min-size=10");
-    setCmds.add("set tez.grouping.max-size=10");
-    // to get at least 10 splits
-    setCmds.add("set tez.grouping.split-waves=10");
-    if (more != null) {
-      setCmds.addAll(Arrays.asList(more));
-    }
-    return setCmds;
-  }
 }
\ No newline at end of file
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java
new file mode 100644
index 0000000..3301745
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits2.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import org.junit.Test;
+
+/**
+ * TestJdbcGenericUDTFGetSplits2.
+ */
+public class TestJdbcGenericUDTFGetSplits2 extends 
AbstractTestJdbcGenericUDTFGetSplits {
+
+  @Test(timeout = 200000)
+  public void testGenericUDTFOrderBySplitCount1() throws Exception {
+    super.testGenericUDTFOrderBySplitCount1("get_llap_splits", new int[]{12, 
3, 1, 3, 12});
+  }
+
+}
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java
new file mode 100644
index 0000000..e2884d1
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestNewGetSplitsFormat.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hive.jdbc;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hadoop.hive.llap.LlapInputSplit;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.junit.BeforeClass;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * TestNewGetSplitsFormat.
+ */
+public class TestNewGetSplitsFormat extends BaseJdbcWithMiniLlap {
+
+  @BeforeClass public static void beforeTest() throws Exception {
+    HiveConf conf = defaultConf();
+    conf.setBoolVar(HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+    
conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED,
 true);
+    BaseJdbcWithMiniLlap.beforeTest(conf);
+  }
+
+  @Override protected InputFormat<NullWritable, Row> getInputFormat() {
+    //For unit testing, no harm in hard-coding allocator ceiling to 
LONG.MAX_VALUE
+    return new LlapArrowRowInputFormat(Long.MAX_VALUE);
+  }
+
+  @Override public void testDataTypes() throws Exception {
+    TestJdbcWithMiniLlapVectorArrow testJdbcWithMiniLlapVectorArrow = new 
TestJdbcWithMiniLlapVectorArrow();
+    testJdbcWithMiniLlapVectorArrow.testDataTypes();
+  }
+
+  @Override protected int processQuery(String currentDatabase, String query, 
int numSplits, RowProcessor rowProcessor)
+      throws Exception {
+    String url = miniHS2.getJdbcURL();
+    String user = System.getProperty("user.name");
+    String pwd = user;
+    String handleId = UUID.randomUUID().toString();
+
+    InputFormat<NullWritable, Row> inputFormat = getInputFormat();
+
+    // Get splits
+    JobConf job = new JobConf(conf);
+    job.set(LlapBaseInputFormat.URL_KEY, url);
+    job.set(LlapBaseInputFormat.USER_KEY, user);
+    job.set(LlapBaseInputFormat.PWD_KEY, pwd);
+    job.set(LlapBaseInputFormat.QUERY_KEY, query);
+    job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
+    job.set(LlapBaseInputFormat.USE_NEW_SPLIT_FORMAT, "true");
+    if (currentDatabase != null) {
+      job.set(LlapBaseInputFormat.DB_KEY, currentDatabase);
+    }
+
+    InputSplit[] splits = inputFormat.getSplits(job, numSplits);
+    assertTrue(splits.length > 2);
+
+    // populate actual splits with schema and planBytes[]
+    LlapInputSplit schemaSplit = (LlapInputSplit) splits[0];
+    LlapInputSplit planSplit = (LlapInputSplit) splits[1];
+
+    List<LlapInputSplit> actualSplits = new ArrayList<>();
+
+    for (int i = 2; i < splits.length; i++) {
+      LlapInputSplit actualSplit = (LlapInputSplit) splits[i];
+      actualSplit.setSchema(schemaSplit.getSchema());
+      actualSplit.setPlanBytes(planSplit.getPlanBytes());
+      actualSplits.add(actualSplit);
+    }
+
+    // Fetch rows from splits
+    int rowCount = 0;
+    for (InputSplit split : actualSplits) {
+      System.out.println("Processing split " + split.getLocations());
+      RecordReader<NullWritable, Row> reader = 
inputFormat.getRecordReader(split, job, null);
+      Row row = reader.createValue();
+      while (reader.next(NullWritable.get(), row)) {
+        rowProcessor.process(row);
+        ++rowCount;
+      }
+      //In arrow-mode this will throw exception unless all buffers have been 
released
+      //See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader
+      reader.close();
+    }
+    LlapBaseInputFormat.close(handleId);
+
+    return rowCount;
+  }
+
+}
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java 
b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
index 2896651..068a913 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
@@ -91,6 +91,14 @@ public class LlapInputSplit implements 
InputSplitWithLocationInfo {
     return tokenBytes;
   }
 
+  public void setPlanBytes(byte[] planBytes) {
+    this.planBytes = planBytes;
+  }
+
+  public void setSchema(Schema schema) {
+    this.schema = schema;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeInt(splitNum);
diff --git 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 2aa82b5..5c99655 100644
--- 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
@@ -116,10 +115,11 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
   public static final String PWD_KEY = "llap.if.pwd";
   public static final String HANDLE_ID = "llap.if.handleid";
   public static final String DB_KEY = "llap.if.database";
+  public static final String USE_NEW_SPLIT_FORMAT = 
"llap.if.use.new.split.format";
   public static final String SESSION_QUERIES_FOR_GET_NUM_SPLITS = 
"llap.session.queries.for.get.num.splits";
   public static final Pattern SET_QUERY_PATTERN = 
Pattern.compile("^\\s*set\\s+.*=.+$", Pattern.CASE_INSENSITIVE);
 
-  public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
+  public static final String SPLIT_QUERY = "select get_llap_splits(\"%s\",%d)";
   public static final LlapServiceInstance[] serviceInstanceArray = new 
LlapServiceInstance[0];
 
   public LlapBaseInputFormat(String url, String user, String pwd, String 
query) {
@@ -281,13 +281,43 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
           }
         }
 
+        // In case of USE_NEW_SPLIT_FORMAT=true, following format is used
+        //       type                  split
+        // schema-split             LlapInputSplit -- contains only schema
+        // plan-split               LlapInputSplit -- contains only planBytes[]
+        // 0                        LlapInputSplit -- actual split 1
+        // 1                        LlapInputSplit -- actual split 2
+        // ...                         ...
+        boolean useNewSplitFormat = job.getBoolean(USE_NEW_SPLIT_FORMAT, 
false);
+
         ResultSet res = stmt.executeQuery(sql);
+        int count = 0;
+        LlapInputSplit schemaSplit = null;
+        LlapInputSplit planSplit = null;
         while (res.next()) {
           // deserialize split
-          DataInput in = new DataInputStream(res.getBinaryStream(1));
-          InputSplitWithLocationInfo is = new LlapInputSplit();
+          DataInput in = new DataInputStream(res.getBinaryStream(2));
+          LlapInputSplit is = new LlapInputSplit();
           is.readFields(in);
-          ins.add(is);
+          if (useNewSplitFormat) {
+            ins.add(is);
+          } else {
+            // to keep the old format, populate schema and planBytes[] in 
actual splits
+            if (count == 0) {
+              schemaSplit = is;
+              if (numSplits == 0) {
+                ins.add(schemaSplit);
+              }
+            } else if (count == 1) {
+              planSplit = is;
+            } else {
+              is.setSchema(schemaSplit.getSchema());
+              assert planSplit != null;
+              is.setPlanBytes(planSplit.getPlanBytes());
+              ins.add(is);
+            }
+            count++;
+          }
         }
         res.close();
       } catch (Exception e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index 62d082d..b0d7a4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -538,6 +538,7 @@ public final class FunctionRegistry {
     system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class);
     system.registerGenericUDTF("stack", GenericUDTFStack.class);
     system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class);
+    system.registerGenericUDTF("get_llap_splits", GenericUDTFGetSplits2.class);
     system.registerGenericUDTF("get_sql_schema", 
GenericUDTFGetSQLSchema.class);
 
     //PTF declarations
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 5c760e8..b6974fa 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -135,14 +135,30 @@ public class GenericUDTFGetSplits extends GenericUDTF {
   protected transient JobConf jc;
   private boolean orderByQuery;
   private boolean forceSingleSplit;
-  private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
-  private DataOutput dos = new DataOutputStream(bos);
+  protected ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
+  protected DataOutput dos = new DataOutputStream(bos);
+  protected String inputArgQuery;
+  protected int inputArgNumSplits;
+  protected boolean schemaSplitOnly;
 
   @Override
   public StructObjectInspector initialize(ObjectInspector[] arguments)
       throws UDFArgumentException {
-
     LOG.debug("initializing GenericUDFGetSplits");
+    validateInput(arguments);
+
+    List<String> names = Arrays.asList("split");
+    List<ObjectInspector> fieldOIs = Arrays
+        .<ObjectInspector> 
asList(PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
+    StructObjectInspector outputOI = ObjectInspectorFactory
+        .getStandardStructObjectInspector(names, fieldOIs);
+
+    LOG.debug("done initializing GenericUDFGetSplits");
+    return outputOI;
+  }
+
+  protected void validateInput(ObjectInspector[] arguments)
+      throws UDFArgumentLengthException, UDFArgumentTypeException {
 
     if (SessionState.get() == null || SessionState.get().getConf() == null) {
       throw new IllegalStateException("Cannot run get splits outside HS2");
@@ -167,15 +183,6 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
     stringOI = (StringObjectInspector) arguments[0];
     intOI = (IntObjectInspector) arguments[1];
-
-    List<String> names = Arrays.asList("split");
-    List<ObjectInspector> fieldOIs = Arrays
-        .<ObjectInspector> 
asList(PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
-    StructObjectInspector outputOI = ObjectInspectorFactory
-        .getStandardStructObjectInspector(names, fieldOIs);
-
-    LOG.debug("done initializing GenericUDFGetSplits");
-    return outputOI;
   }
 
   public static class PlanFragment {
@@ -192,9 +199,31 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
   @Override
   public void process(Object[] arguments) throws HiveException {
+    initArgs(arguments);
+    try {
+      SplitResult splitResult = getSplitResult(false);
+      InputSplit[] splits = schemaSplitOnly ? new 
InputSplit[]{splitResult.schemaSplit} : splitResult.actualSplits;
+      for (InputSplit s : splits) {
+        Object[] os = new Object[1];
+        bos.reset();
+        s.write(dos);
+        byte[] frozen = bos.toByteArray();
+        os[0] = frozen;
+        forward(os);
+      }
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
 
-    String query = stringOI.getPrimitiveJavaObject(arguments[0]);
-    int num = intOI.get(arguments[1]);
+  protected void initArgs(Object[] arguments) {
+    inputArgQuery = stringOI.getPrimitiveJavaObject(arguments[0]);
+    inputArgNumSplits = intOI.get(arguments[1]);
+    schemaSplitOnly = inputArgNumSplits == 0;
+  }
+
+  protected SplitResult getSplitResult(boolean generateLightWeightSplits)
+      throws HiveException, IOException {
 
     // Generate applicationId for the LLAP splits
     LlapCoordinator coordinator = LlapCoordinator.getInstance();
@@ -205,32 +234,36 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     ApplicationId applicationId = coordinator.createExtClientAppId();
     LOG.info("Generated appID {} for LLAP splits", applicationId.toString());
 
-    PlanFragment fragment = createPlanFragment(query, num, applicationId);
+    PlanFragment fragment = createPlanFragment(inputArgQuery, applicationId);
     TezWork tezWork = fragment.work;
     Schema schema = fragment.schema;
 
     boolean generateSingleSplit = forceSingleSplit && orderByQuery;
-    try {
-      InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId, 
generateSingleSplit);
-      LOG.info("Generated {} splits for query {}. orderByQuery: {} 
forceSingleSplit: {}", splits.length, query,
-        orderByQuery, forceSingleSplit);
-      if (generateSingleSplit && splits.length > 1) {
-        throw new HiveException("Got more than one split (Got: " + 
splits.length + ") for order by query: " + query);
+
+    SplitResult splitResult = getSplits(jc, tezWork, schema, applicationId, 
generateSingleSplit,
+        generateLightWeightSplits);
+    validateSplitResult(splitResult, generateLightWeightSplits, 
generateSingleSplit);
+    return splitResult;
+  }
+
+  private void validateSplitResult(SplitResult splitResult, boolean 
generateLightWeightSplits,
+                                   boolean generateSingleSplit) throws 
HiveException {
+    Preconditions.checkNotNull(splitResult.schemaSplit, "schema split cannot 
be null");
+    if (!schemaSplitOnly) {
+      InputSplit[] splits = splitResult.actualSplits;
+      if (splits.length > 0 && generateLightWeightSplits) {
+        Preconditions.checkNotNull(splitResult.planSplit, "plan split cannot 
be null");
       }
-      for (InputSplit s : splits) {
-        Object[] os = new Object[1];
-        bos.reset();
-        s.write(dos);
-        byte[] frozen = bos.toByteArray();
-        os[0] = frozen;
-        forward(os);
+      LOG.info("Generated {} splits for query {}. orderByQuery: {} 
forceSingleSplit: {}", splits.length, inputArgQuery,
+          orderByQuery, forceSingleSplit);
+      if (generateSingleSplit && splits.length > 1) {
+        throw new HiveException("Got more than one split (Got: " + 
splits.length
+            + ") for order by query: " + inputArgQuery);
       }
-    } catch (Exception e) {
-      throw new HiveException(e);
     }
   }
 
-  public PlanFragment createPlanFragment(String query, int num, ApplicationId 
splitsAppId)
+  private PlanFragment createPlanFragment(String query, ApplicationId 
splitsAppId)
       throws HiveException {
 
     HiveConf conf = new HiveConf(SessionState.get().getConf());
@@ -250,7 +283,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     // hive compiler is going to remove inner order by. disable that 
optimization until then.
     HiveConf.setBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY, false);
 
-    if(num == 0) {
+    if (schemaSplitOnly) {
       //Schema only
       try {
         List<FieldSchema> fieldSchemas = 
ParseUtils.parseQueryAndGetSchema(conf, query);
@@ -378,16 +411,21 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     }
   }
 
-  public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, 
Schema schema, ApplicationId applicationId,
-    final boolean generateSingleSplit)
-    throws IOException {
 
-    if(numSplits == 0) {
-      //Schema only
-      LlapInputSplit schemaSplit = new LlapInputSplit(
-          0, new byte[0], new byte[0], new byte[0],
-          new SplitLocationInfo[0], schema, "", new byte[0]);
-      return new InputSplit[] { schemaSplit };
+  // generateLightWeightSplits - if true then
+  // 1) schema and planBytes[] in each LlapInputSplit are not populated
+  // 2) schemaSplit(contains only schema) and planSplit(contains only 
planBytes[]) are populated in SplitResult
+  private SplitResult getSplits(JobConf job, TezWork work, Schema schema, 
ApplicationId applicationId,
+                               final boolean generateSingleSplit, boolean 
generateLightWeightSplits)
+      throws IOException {
+
+    SplitResult splitResult = new SplitResult();
+    splitResult.schemaSplit = new LlapInputSplit(
+        0, new byte[0], new byte[0], new byte[0],
+        new SplitLocationInfo[0], schema, "", new byte[0]);
+    if (schemaSplitOnly) {
+      // schema only
+      return splitResult;
     }
 
     DAG dag = DAG.create(work.getName());
@@ -429,14 +467,15 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
       HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, 
mapWork, generateSingleSplit);
       List<Event> eventList = splitGenerator.initialize();
-      InputSplit[] result = new InputSplit[eventList.size() - 1];
+      int numGroupedSplitsGenerated = eventList.size() - 1;
+      InputSplit[] result = new InputSplit[numGroupedSplitsGenerated];
 
       InputConfigureVertexTasksEvent configureEvent
         = (InputConfigureVertexTasksEvent) eventList.get(0);
 
       List<TaskLocationHint> hints = 
configureEvent.getLocationHint().getTaskLocationHints();
 
-      Preconditions.checkState(hints.size() == eventList.size() - 1);
+      Preconditions.checkState(hints.size() == numGroupedSplitsGenerated);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("NumEvents=" + eventList.size() + ", NumSplits=" + 
result.length);
@@ -472,11 +511,14 @@ public class GenericUDTFGetSplits extends GenericUDTF {
       // Generate umbilical token (applies to all splits)
       Token<JobTokenIdentifier> umbilicalToken = 
JobTokenCreator.createJobToken(applicationId);
 
-      LOG.info("Number of splits: " + (eventList.size() - 1));
+      LOG.info("Number of splits: " + numGroupedSplitsGenerated);
       SignedMessage signedSvs = null;
-      for (int i = 0; i < eventList.size() - 1; i++) {
+      byte[] submitWorkBytes = null;
+      final byte[] emptySubmitWorkBytes = new byte[0];
+      final Schema emptySchema = new Schema();
+      for (int i = 0; i < numGroupedSplitsGenerated; i++) {
         TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, 
vertexName,
-              eventList.size() - 1, applicationId, i);
+            numGroupedSplitsGenerated, applicationId, i);
 
         // 2. Generate the vertex/submit information for all events.
         if (i == 0) {
@@ -488,28 +530,44 @@ public class GenericUDTFGetSplits extends GenericUDTF {
           // Despite the differences in TaskSpec, the vertex spec should be 
the same.
           signedSvs = createSignedVertexSpec(signer, taskSpec, applicationId, 
queryUser,
               applicationId.toString());
+          SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId,
+              System.currentTimeMillis(), numGroupedSplitsGenerated, 
signedSvs.message,
+              signedSvs.signature, umbilicalToken);
+          submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
+          if (generateLightWeightSplits) {
+            splitResult.planSplit = new LlapInputSplit(
+                0, submitWorkBytes, new byte[0], new byte[0],
+                new SplitLocationInfo[0], new Schema(), "", new byte[0]);
+          }
         }
 
-        SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId,
-            System.currentTimeMillis(), taskSpec.getVertexParallelism(), 
signedSvs.message,
-            signedSvs.signature, umbilicalToken);
-        byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
-
         // 3. Generate input event.
         SignedMessage eventBytes = makeEventBytes(wx, vertexName, 
eventList.get(i + 1), signer);
 
         // 4. Make location hints.
         SplitLocationInfo[] locations = makeLocationHints(hints.get(i));
 
-        result[i] = new LlapInputSplit(i, submitWorkBytes, eventBytes.message,
-            eventBytes.signature, locations, schema, llapUser, tokenBytes);
-       }
-      return result;
+        if (generateLightWeightSplits) {
+          result[i] = new LlapInputSplit(i, emptySubmitWorkBytes, 
eventBytes.message,
+              eventBytes.signature, locations, emptySchema, llapUser, 
tokenBytes);
+        } else {
+          result[i] = new LlapInputSplit(i, submitWorkBytes, 
eventBytes.message,
+              eventBytes.signature, locations, schema, llapUser, tokenBytes);
+        }
+      }
+      splitResult.actualSplits = result;
+      return splitResult;
     } catch (Exception e) {
       throw new IOException(e);
     }
   }
 
+  static class SplitResult {
+    InputSplit schemaSplit;
+    InputSplit planSplit;
+    InputSplit[] actualSplits;
+  }
+
   private static class DriverCleanup implements Closeable {
     private final Driver driver;
     private final HiveTxnManager txnManager;
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits2.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits2.java
new file mode 100644
index 0000000..ae53ab7
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits2.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.mapred.InputSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * GenericUDTFGetSplits2 -  Memory efficient version of GenericUDTFGetSplits.
+ * It separates out information like schema and planBytes[] which is common to 
all the splits.
+ * This produces output in following format.
+ * <p>
+ * type                           split
+ * ----------------------------------------------------
+ * schema-split             LlapInputSplit -- contains only schema
+ * plan-split               LlapInputSplit -- contains only planBytes[]
+ * 0                        LlapInputSplit -- actual split 1
+ * 1                        LlapInputSplit -- actual split 2
+ * ...                         ...
+ */
+@Description(name = "get_llap_splits", value = "_FUNC_(string,int) - "
+    + "Returns an array of length int serialized splits for the referenced 
tables string."
+    + " Passing length 0 returns only schema data for the compiled query. "
+    + "The order of splits is: schema-split, plan-split, 0, 1, 2...where 0, 1, 
2...are the actual splits "
+    + "This UDTF is for internal use by LlapBaseInputFormat and not to be 
invoked explicitly")
+@UDFType(deterministic = false)
+public class GenericUDTFGetSplits2 extends GenericUDTFGetSplits {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GenericUDTFGetSplits2.class);
+
+  @Override public StructObjectInspector initialize(ObjectInspector[] 
arguments) throws UDFArgumentException {
+    LOG.debug("initializing GenericUDFGetSplits2");
+    validateInput(arguments);
+
+    List<String> names = Arrays.asList("type", "split");
+    List<ObjectInspector> fieldOIs = 
Arrays.asList(PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+        PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
+    StructObjectInspector outputOI = 
ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
+
+    LOG.debug("done initializing GenericUDFGetSplits2");
+    return outputOI;
+  }
+
+  @Override public void process(Object[] arguments) throws HiveException {
+    try {
+      initArgs(arguments);
+      SplitResult splitResult = getSplitResult(true);
+      forwardOutput(splitResult);
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  private void forwardOutput(SplitResult splitResult) throws IOException, 
HiveException {
+    for (Map.Entry<String, InputSplit> entry : 
transformSplitResult(splitResult).entrySet()) {
+      Object[] os = new Object[2];
+      os[0] = entry.getKey();
+      InputSplit split = entry.getValue();
+      bos.reset();
+      split.write(dos);
+      os[1] = bos.toByteArray();
+      forward(os);
+    }
+  }
+
+  private Map<String, InputSplit> transformSplitResult(SplitResult 
splitResult) {
+    Map<String, InputSplit> splitMap = new LinkedHashMap<>();
+    splitMap.put("schema-split", splitResult.schemaSplit);
+    if (splitResult.actualSplits != null && splitResult.actualSplits.length > 
0) {
+      Preconditions.checkNotNull(splitResult.planSplit);
+      splitMap.put("plan-split", splitResult.planSplit);
+      for (int i = 0; i < splitResult.actualSplits.length; i++) {
+        splitMap.put("" + i, splitResult.actualSplits[i]);
+      }
+    }
+    return splitMap;
+  }
+}
diff --git a/ql/src/test/results/clientpositive/show_functions.q.out 
b/ql/src/test/results/clientpositive/show_functions.q.out
index 04a0fb4..d88a5f2 100644
--- a/ql/src/test/results/clientpositive/show_functions.q.out
+++ b/ql/src/test/results/clientpositive/show_functions.q.out
@@ -129,6 +129,7 @@ format_number
 from_unixtime
 from_utc_timestamp
 get_json_object
+get_llap_splits
 get_splits
 get_sql_schema
 greatest
@@ -562,6 +563,7 @@ format_number
 from_unixtime
 from_utc_timestamp
 get_json_object
+get_llap_splits
 get_splits
 get_sql_schema
 greatest

Reply via email to