Repository: hive Updated Branches: refs/heads/branch-3 d29011f5f -> 5d6d15bc0
Revert "HIVE-19794 : Disable removing order by from subquery in GenericUDTFGetSplits (Prasanth J via Jason Dere)" This reverts commit 7a830e634ae4d8c67d32e0e0f2da4a8a5f01bbc5. Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b1304672 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b1304672 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b1304672 Branch: refs/heads/branch-3 Commit: b13046721d1be5f2e760d0897f6dd7efefc07353 Parents: d29011f Author: Prasanth Jayachandran <[email protected]> Authored: Sun Jun 10 22:10:40 2018 -0700 Committer: Prasanth Jayachandran <[email protected]> Committed: Sun Jun 10 22:10:40 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 - .../hive/jdbc/TestJdbcGenericUDTFGetSplits.java | 187 ------------------- .../hive/ql/exec/tez/HiveSplitGenerator.java | 30 +-- .../ql/udf/generic/GenericUDTFGetSplits.java | 31 ++- 4 files changed, 18 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b1304672/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fc17636..b24bef5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4161,11 +4161,6 @@ public class HiveConf extends Configuration { LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT("hive.llap.external.splits.temp.table.storage.format", "orc", new StringSet("default", "text", "orc"), "Storage format for temp tables created using LLAP external client"), - LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT("hive.llap.external.splits.order.by.force.single.split", - true, - "If LLAP external clients submits ORDER BY queries, force return a single split to guarantee reading\n" + - "data out in ordered way. Setting this to false will let external clients read data out in parallel\n" + - "losing the ordering (external clients are responsible for guaranteeing the ordering)"), LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false, "Override if grace join should be allowed to run in llap."), http://git-wip-us.apache.org/repos/asf/hive/blob/b1304672/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java deleted file mode 100644 index c8a428c..0000000 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcGenericUDTFGetSplits.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.jdbc; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.PrintStream; -import java.net.URL; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.LlapBaseInputFormat; -import org.apache.hadoop.hive.metastore.api.WMTrigger; -import org.apache.hadoop.hive.ql.wm.Action; -import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; -import org.apache.hadoop.hive.ql.wm.Expression; -import org.apache.hadoop.hive.ql.wm.ExpressionFactory; -import org.apache.hadoop.hive.ql.wm.Trigger; -import org.apache.hive.jdbc.miniHS2.MiniHS2; -import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.google.common.collect.Lists; - -public class TestJdbcGenericUDTFGetSplits { - protected static MiniHS2 miniHS2 = null; - protected static String dataFileDir; - static Path kvDataFilePath; - protected static String tableName = "testtab1"; - - protected static HiveConf conf = null; - protected Connection hs2Conn = null; - - @BeforeClass - public static void beforeTest() throws Exception { - Class.forName(MiniHS2.getJdbcDriverName()); - - String confDir = "../../data/conf/llap/"; - HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); - System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); - - conf = new HiveConf(); - conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); - conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); - conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, TimeUnit.MILLISECONDS); - conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true); - conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); - conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); - conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); - conf.setVar(ConfVars.LLAP_EXTERNAL_SPLITS_TEMP_TABLE_STORAGE_FORMAT, "text"); - - - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/tez-site.xml")); - - miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); - dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); - kvDataFilePath = new Path(dataFileDir, "kv1.txt"); - - Map<String, String> confOverlay = new HashMap<>(); - miniHS2.start(confOverlay); - miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); - } - - @Before - public void setUp() throws Exception { - hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); - } - - @After - public void tearDown() throws Exception { - LlapBaseInputFormat.closeAll(); - hs2Conn.close(); - } - - @AfterClass - public static void afterTest() throws Exception { - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - } - - @Test(timeout = 200000) - public void testGenericUDTFOrderBySplitCount1() throws Exception { - String query = "select get_splits(" + "'select value from " + tableName + "', 5)"; - runQuery(query, getConfigs(), 10); - - query = "select get_splits(" + "'select value from " + tableName + " order by under_col', 5)"; - runQuery(query, getConfigs(), 1); - - query = "select get_splits(" + - "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; - runQuery(query, getConfigs(), 1); - - List<String> setCmds = getConfigs(); - setCmds.add("set hive.llap.external.splits.order.by.force.single.split=false"); - query = "select get_splits(" + - "'select `value` from (select value from " + tableName + " where value is not null order by value) as t', 5)"; - runQuery(query, setCmds, 10); - } - - private void runQuery(final String query, final List<String> setCmds, - final int numRows) throws Exception { - - Connection con = hs2Conn; - BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - System.setErr(new PrintStream(baos)); // capture stderr - final Statement selStmt = con.createStatement(); - Throwable throwable = null; - int rowCount = 0; - try { - try { - if (setCmds != null) { - for (String setCmd : setCmds) { - selStmt.execute(setCmd); - } - } - ResultSet resultSet = selStmt.executeQuery(query); - while(resultSet.next()) { - rowCount++; - } - } catch (SQLException e) { - throwable = e; - } - selStmt.close(); - assertNull(throwable); - System.out.println("Expected " + numRows + " rows for query '" + query + "'. Got: " + rowCount); - assertEquals("Expected rows: " + numRows + " got: " + rowCount, numRows, rowCount); - } finally { - baos.close(); - } - - } - - List<String> getConfigs(String... more) { - List<String> setCmds = new ArrayList<>(); - setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); - setCmds.add("set mapred.min.split.size=10"); - setCmds.add("set mapred.max.split.size=10"); - setCmds.add("set tez.grouping.min-size=10"); - setCmds.add("set tez.grouping.max-size=10"); - // to get at least 10 splits - setCmds.add("set tez.grouping.split-waves=10"); - if (more != null) { - setCmds.addAll(Arrays.asList(more)); - } - return setCmds; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/b1304672/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 5dc4a1f..57f6c66 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -29,9 +29,6 @@ import java.util.Set; import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.tez.common.counters.TezCounters; @@ -89,15 +86,13 @@ public class HiveSplitGenerator extends InputInitializer { private final MapWork work; private final SplitGrouper splitGrouper = new SplitGrouper(); private final SplitLocationProvider splitLocationProvider; - private boolean generateSingleSplit; - public HiveSplitGenerator(Configuration conf, MapWork work, final boolean generateSingleSplit) throws IOException { + public HiveSplitGenerator(Configuration conf, MapWork work) throws IOException { super(null); this.conf = conf; this.work = work; this.jobConf = new JobConf(conf); - this.generateSingleSplit = generateSingleSplit; // Assuming grouping enabled always. userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build(); @@ -204,27 +199,8 @@ public class HiveSplitGenerator extends InputInitializer { conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES, TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); - InputSplit[] splits; - if (generateSingleSplit) { - splits = new InputSplit[1]; - List<Path> paths = Utilities.getInputPathsTez(jobConf, Utilities.getMapWork(jobConf)); - FileSystem fs = paths.get(0).getFileSystem(jobConf); - FileStatus[] fileStatuses = fs.listStatus(paths.get(0)); - FileStatus fileStatus = fileStatuses[0]; - Preconditions.checkState(paths.size() == 1 && fileStatuses.length == 1, "Requested to generate single " + - "split. Paths and fileStatuses are expected to be 1. Got paths: " + paths.size() + " fileStatuses: " + - fileStatuses.length); - BlockLocation[] locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); - Set<String> hostsSet = new HashSet<>(); - for (BlockLocation location : locations) { - hostsSet.addAll(Lists.newArrayList(location.getHosts())); - } - String[] hosts = hostsSet.toArray(new String[0]); - splits[0] = new FileSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts); - } else { - // Raw splits - splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); - } + // Raw splits + InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); // Sort the splits, so that subsequent grouping is consistent. Arrays.sort(splits, new InputSplitComparator()); LOG.info("Number of input splits: " + splits.length + ". " + availableSlots http://git-wip-us.apache.org/repos/asf/hive/blob/b1304672/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index a29b560..25a0ef2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -87,7 +88,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspecto import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SplitLocationInfo; @@ -130,7 +130,6 @@ public class GenericUDTFGetSplits extends GenericUDTF { protected transient IntObjectInspector intOI; protected transient JobConf jc; private boolean orderByQuery; - private boolean forceSingleSplit; private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); private DataOutput dos = new DataOutputStream(bos); @@ -205,12 +204,14 @@ public class GenericUDTFGetSplits extends GenericUDTF { TezWork tezWork = fragment.work; Schema schema = fragment.schema; - boolean generateSingleSplit = forceSingleSplit && orderByQuery; + if (orderByQuery) { + jc.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_LENGTH, false); + jc.setBoolean(TezSplitGrouper.TEZ_GROUPING_SPLIT_BY_COUNT, true); + jc.setInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_COUNT, 1); + } try { - InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId, generateSingleSplit); - LOG.info("Generated {} splits for query {}. orderByQuery: {} forceSingleSplit: {}", splits.length, query, - orderByQuery, forceSingleSplit); - if (generateSingleSplit && splits.length > 1) { + InputSplit[] splits = getSplits(jc, num, tezWork, schema, applicationId); + if (orderByQuery && splits.length > 1) { throw new HiveException("Got more than one split (Got: " + splits.length + ") for order by query: " + query); } for (InputSplit s : splits) { @@ -242,9 +243,6 @@ public class GenericUDTFGetSplits extends GenericUDTF { // Tez/LLAP requires RPC query plan HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true); HiveConf.setBoolVar(conf, ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, false); - // spark-llap always wraps query under a subquery, until that is removed from spark-llap - // hive compiler is going to remove inner order by. disable that optimization until then. - HiveConf.setBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY, false); try { jc = DagUtils.getInstance().createConfiguration(conf); @@ -269,9 +267,9 @@ public class GenericUDTFGetSplits extends GenericUDTF { } QueryPlan plan = driver.getPlan(); - orderByQuery = plan.getQueryProperties().hasOrderBy() || plan.getQueryProperties().hasOuterOrderBy(); - forceSingleSplit = orderByQuery && - HiveConf.getBoolVar(conf, ConfVars.LLAP_EXTERNAL_SPLITS_ORDER_BY_FORCE_SINGLE_SPLIT); + if (plan.getQueryProperties().hasOuterOrderBy()) { + orderByQuery = true; + } List<Task<?>> roots = plan.getRootTasks(); Schema schema = convertSchema(plan.getResultSchema()); if(num == 0) { @@ -367,8 +365,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { } } - public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId, - final boolean generateSingleSplit) + public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId) throws IOException { if(numSplits == 0) { @@ -416,8 +413,10 @@ public class GenericUDTFGetSplits extends GenericUDTF { Preconditions.checkState(HiveConf.getBoolVar(wxConf, ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS)); - HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork, generateSingleSplit); + + HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork); List<Event> eventList = splitGenerator.initialize(); + InputSplit[] result = new InputSplit[eventList.size() - 1]; InputConfigureVertexTasksEvent configureEvent
