This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f6906f2af1 HIVE-25976: Cleaner may remove files being accessed from a
fetch-task-converted reader (Laszlo Vegh, reviewed by Peter Vary, Krisztian
Kasa)
f6906f2af1 is described below
commit f6906f2af1e1feefc6f5b6a392307d0fb655bb1c
Author: veghlaci05 <[email protected]>
AuthorDate: Wed Jun 29 11:52:36 2022 +0200
HIVE-25976: Cleaner may remove files being accessed from a
fetch-task-converted reader (Laszlo Vegh, reviewed by Peter Vary, Krisztian
Kasa)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../mapreduce/TestHCatMultiOutputFormat.java | 1 +
.../iceberg/mr/hive/TestHiveIcebergTimeTravel.java | 25 ++--
.../hive/minikdc/JdbcWithMiniKdcSQLAuthTest.java | 1 +
.../hive/minikdc/TestHs2HooksWithMiniKdc.java | 1 +
.../hive/minikdc/TestJdbcWithMiniKdcCookie.java | 2 +
.../apache/hive/minikdc/TestSSLWithMiniKdc.java | 1 +
.../parse/TestReplicationOnHDFSEncryptedZones.java | 7 +-
.../hive/ql/txn/compactor/TestCompactor.java | 40 ++++-
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 26 ++--
.../org/apache/hadoop/hive/ql/exec/FetchTask.java | 166 +++++++++++++--------
.../hive/ql/optimizer/SimpleFetchOptimizer.java | 34 +++--
.../queries/clientpositive/nonmr_fetch_threshold.q | 1 +
.../clientpositive/nonmr_fetch_threshold2.q | 16 ++
.../clientnegative/cluster_tasklog_retrieval.q.out | 2 +-
.../results/clientnegative/udf_assert_true.q.out | 2 +-
.../results/clientnegative/udf_assert_true2.q.out | 2 +-
.../results/clientnegative/udf_reflect_neg.q.out | 2 +-
.../results/clientnegative/udf_test_error.q.out | 2 +-
.../llap/nonmr_fetch_threshold2.q.out | 69 +++++++++
20 files changed, 294 insertions(+), 110 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 67cfef75a3..50cfb85ba9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3503,6 +3503,10 @@ public class HiveConf extends Configuration {
"1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only\n" +
"2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and
virtual columns)"
),
+ HIVEFETCHTASKCACHING("hive.fetch.task.caching", true,
+ "Enabling the caching of the result of fetch tasks eliminates the
chance of running into a failing read." +
+ " On the other hand, if enabled, the
hive.fetch.task.conversion.threshold must be adjusted accordingly. That" +
+ " is 1GB by default which must be lowered in case of enabled
caching to prevent the consumption of too much memory."),
HIVEFETCHTASKCONVERSIONTHRESHOLD("hive.fetch.task.conversion.threshold",
1073741824L,
"Input threshold for applying hive.fetch.task.conversion. If target
table is native, input length\n" +
"is calculated by summation of file lengths. If it's not native,
storage handler for the table\n" +
diff --git
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
index c62d356ddd..93d1418afa 100644
---
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
+++
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
@@ -388,6 +388,7 @@ public class TestHCatMultiOutputFormat {
conf.set("_hive.hdfs.session.path", "path");
conf.set("_hive.local.session.path", "path");
task.initialize(queryState, null, null, new
org.apache.hadoop.hive.ql.Context(conf));
+ task.execute();
task.fetch(temp);
for (String str : temp) {
results.add(str.replace("\t", ","));
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java
index 233f87a857..13cfd975f4 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.mr.hive;
import java.io.IOException;
import java.util.List;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Table;
import org.junit.Assert;
@@ -50,10 +49,14 @@ public class TestHiveIcebergTimeTravel extends
HiveIcebergStorageHandlerWithEngi
Assert.assertEquals(4, rows.size());
- AssertHelpers.assertThrows("should throw exception",
IllegalArgumentException.class,
- "java.lang.IllegalArgumentException: Cannot find a snapshot older than
1970-01-01", () -> {
- shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS OF
'1970-01-01 00:00:00'");
- });
+ try {
+ shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS OF
'1970-01-01 00:00:00'");
+ } catch (Throwable e) {
+ while (e.getCause() != null) {
+ e = e.getCause();
+ }
+ Assert.assertTrue(e.getMessage().contains("Cannot find a snapshot older
than 1970-01-01"));
+ }
}
@Test
@@ -73,10 +76,14 @@ public class TestHiveIcebergTimeTravel extends
HiveIcebergStorageHandlerWithEngi
Assert.assertEquals(4, rows.size());
- AssertHelpers.assertThrows("should throw exception",
IllegalArgumentException.class,
- "Cannot find snapshot with ID 1234", () -> {
- shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION
AS OF 1234");
- });
+ try {
+ shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF
1234");
+ } catch (Throwable e) {
+ while (e.getCause() != null) {
+ e = e.getCause();
+ }
+ Assert.assertTrue(e.getMessage().contains("Cannot find snapshot with ID
1234"));
+ }
}
@Test
diff --git
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/JdbcWithMiniKdcSQLAuthTest.java
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/JdbcWithMiniKdcSQLAuthTest.java
index ca10a45732..fccf3e0209 100644
---
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/JdbcWithMiniKdcSQLAuthTest.java
+++
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/JdbcWithMiniKdcSQLAuthTest.java
@@ -59,6 +59,7 @@ public abstract class JdbcWithMiniKdcSQLAuthTest {
hiveConf.setBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED, true);
hiveConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ hiveConf.setBoolVar(ConfVars.HIVEFETCHTASKCACHING, false);
miniHS2 = MiniHiveKdc.getMiniHS2WithKerb(miniHiveKdc, hiveConf);
miniHS2.start(new HashMap<String, String>());
diff --git
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java
index bb244a003e..890e4092ea 100644
---
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java
+++
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java
@@ -55,6 +55,7 @@ public class TestHs2HooksWithMiniKdc {
confOverlay.put(ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
SemanticAnalysisHook.class.getName());
confOverlay.put(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "" +
Boolean.FALSE);
+ confOverlay.put(ConfVars.HIVEFETCHTASKCACHING.varname, "" + false);
miniHiveKdc = new MiniHiveKdc();
HiveConf hiveConf = new HiveConf();
diff --git
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdcCookie.java
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdcCookie.java
index 33189202ab..883d333dd4 100644
---
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdcCookie.java
+++
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdcCookie.java
@@ -81,6 +81,8 @@ public class TestJdbcWithMiniKdcCookie {
hiveConf.setTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE,
1, TimeUnit.SECONDS);
hiveConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ hiveConf.setBoolVar(ConfVars.HIVEFETCHTASKCACHING, false);
+
miniHS2 = MiniHiveKdc.getMiniHS2WithKerb(miniHiveKdc, hiveConf);
miniHS2.start(new HashMap<String, String>());
}
diff --git
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
index f6869c31da..cea9503697 100644
---
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
+++
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
@@ -50,6 +50,7 @@ public class TestSSLWithMiniKdc {
SSLTestUtils.setMetastoreSslConf(hiveConf);
hiveConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ hiveConf.setBoolVar(ConfVars.HIVEFETCHTASKCACHING, false);
setHMSSaslConf(miniHiveKdc, hiveConf);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
index 94324c1317..c33188ff16 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
@@ -152,8 +152,11 @@ public class TestReplicationOnHDFSEncryptedZones {
.run("select value from encrypted_table")
.verifyResults(new String[] { "value1", "value2" });
Assert.fail("Src EZKey shouldn't be present on target");
- } catch (IOException e) {
- Assert.assertTrue(e.getCause().getMessage().contains("KeyVersion name
'test_key@0' does not exist"));
+ } catch (Throwable e) {
+ while (e.getCause() != null) {
+ e = e.getCause();
+ }
+ Assert.assertTrue(e.getMessage().contains("KeyVersion name 'test_key@0'
does not exist"));
}
//read should pass without raw-byte distcp
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 5673446be7..7f3c239b46 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -72,11 +72,13 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.common.util.Retry;
import org.apache.hive.hcatalog.common.HCatUtil;
@@ -1025,6 +1027,36 @@ public class TestCompactor {
Lists.newArrayList(5, 6), 1);
}
+ @Test
+ public void majorCompactDuringFetchTaskConvertedRead() throws Exception {
+ driver.close();
+ driver = DriverFactory.newDriver(conf);
+ String dbName = "default";
+ String tblName = "cws";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+
+ executeStatementOnDriver(
+ "CREATE TABLE " + tblName + "(a INT, b STRING) " + " STORED AS ORC
TBLPROPERTIES ('transactional'='true')",
+ driver);
+ executeStatementOnDriver("insert into " + tblName + " values (1, 'a')",
driver);
+ executeStatementOnDriver("insert into " + tblName + " values (3, 'b')",
driver);
+ executeStatementOnDriver("insert into " + tblName + " values (4, 'a')",
driver);
+ executeStatementOnDriver("insert into " + tblName + " values (5, 'b')",
driver);
+
+ CommandProcessorResponse resp = driver.run("select * from " + tblName + "
LIMIT 5");
+ FetchTask ft = driver.getFetchTask();
+ ft.setMaxRows(1);
+ List res = new ArrayList();
+ ft.fetch(res);
+ assertEquals(1, res.size());
+
+ runMajorCompaction(dbName, tblName);
+ runCleaner(conf);
+
+ ft.fetch(res);
+ assertEquals(2, res.size());
+ }
+
@Test
public void testCleanAbortCompactAfter2ndCommitAbort() throws Exception {
String dbName = "default";
@@ -1727,8 +1759,8 @@ public class TestCompactor {
verifyHasBase(table.getSd(), fs, "base_0000005_v0000017");
runCleaner(conf);
// in case when we have # of accumulated entries for the same
table/partition - we need to process them one-by-one in ASC order of write_id's,
- // however, to support multi-threaded processing in the Cleaner, we have
to move entries from the same group to the next Cleaner cycle,
- // so that they are not processed by multiple threads concurrently.
+ // however, to support multi-threaded processing in the Cleaner, we have
to move entries from the same group to the next Cleaner cycle,
+ // so that they are not processed by multiple threads concurrently.
runCleaner(conf);
verifyDeltaCount(table.getSd(), fs, 0);
}
@@ -1742,7 +1774,9 @@ public class TestCompactor {
StorageDescriptor sd, FileSystem fs, String name, PathFilter filter)
throws Exception {
FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter);
for (FileStatus file : stat) {
- if (name.equals(file.getPath().getName())) return;
+ if (name.equals(file.getPath().getName())) {
+ return;
+ }
}
Assert.fail("Cannot find " + name + ": " + Arrays.toString(stat));
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index c2ecbf46bf..3565763038 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -203,6 +203,18 @@ public class Driver implements IDriver {
perfLogger = SessionState.getPerfLogger(true);
}
execute();
+
+ FetchTask fetchTask = driverContext.getPlan().getFetchTask();
+ if (fetchTask != null) {
+ fetchTask.setTaskQueue(null);
+ fetchTask.setQueryPlan(null);
+ try {
+ fetchTask.execute();
+ driverContext.setFetchTask(fetchTask);
+ } catch (Throwable e) {
+ throw new CommandProcessorException(e);
+ }
+ }
driverTxnHandler.handleTransactionAfterExecution();
driverContext.getQueryDisplay().setPerfLogStarts(QueryDisplay.Phase.EXECUTION,
perfLogger.getStartTimes());
@@ -635,12 +647,10 @@ public class Driver implements IDriver {
}
if (isFetchingTable()) {
try {
- driverContext.getFetchTask().clearFetch();
+ driverContext.getFetchTask().resetFetch();
} catch (Exception e) {
- throw new IOException("Error closing the current fetch task", e);
+ throw new IOException("Error resetting the current fetch task", e);
}
- // FetchTask should not depend on the plan.
- driverContext.getFetchTask().initialize(driverContext.getQueryState(),
null, null, context);
} else {
context.resetStream();
driverContext.setResStream(null);
@@ -782,14 +792,6 @@ public class Driver implements IDriver {
private void releasePlan() {
try {
- if (driverContext.getPlan() != null) {
- FetchTask fetchTask = driverContext.getPlan().getFetchTask();
- if (fetchTask != null) {
- fetchTask.setTaskQueue(null);
- fetchTask.setQueryPlan(null);
- }
- driverContext.setFetchTask(fetchTask);
- }
driverContext.setPlan(null);
} catch (Exception e) {
LOG.debug("Exception while clearing the Fetch task", e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index 64a9a1becd..61157c35a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -18,16 +18,11 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.TaskQueue;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.TaskQueue;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -40,6 +35,12 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
/**
* FetchTask implementation.
@@ -49,9 +50,12 @@ public class FetchTask extends Task<FetchWork> implements
Serializable {
private int maxRows = 100;
private FetchOperator fetch;
private ListSinkOperator sink;
+ private List fetchedData;
+ private int currentRow;
private int totalRows;
- private static transient final Logger LOG =
LoggerFactory.getLogger(FetchTask.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FetchTask.class);
JobConf job = null;
+ private boolean cachingEnabled;
public FetchTask() {
super();
@@ -66,32 +70,13 @@ public class FetchTask extends Task<FetchWork> implements
Serializable {
super.initialize(queryState, queryPlan, taskQueue, context);
work.initializeForFetch(context.getOpContext());
+ cachingEnabled = HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVEFETCHTASKCACHING);
+ fetchedData = new ArrayList<>();
+
try {
// Create a file system handle
- if (job == null) {
- // The job config should be initilaized once per fetch task. In case
of refetch, we should use the
- // same config.
- job = new JobConf(conf);
- }
-
- Operator<?> source = work.getSource();
- if (source instanceof TableScanOperator) {
- TableScanOperator ts = (TableScanOperator) source;
- // push down projections
- ColumnProjectionUtils.appendReadColumns(job, ts.getNeededColumnIDs(),
ts.getNeededColumns(),
- ts.getNeededNestedColumnPaths(),
ts.getConf().hasVirtualCols());
- // push down filters and as of information
- HiveInputFormat.pushFiltersAndAsOf(job, ts, null);
-
- AcidUtils.setAcidOperationalProperties(job,
ts.getConf().isTranscationalTable(),
- ts.getConf().getAcidOperationalProperties());
- }
- sink = work.getSink();
- fetch = new FetchOperator(work, job, source, getVirtualColumns(source));
- source.initialize(conf, new
ObjectInspector[]{fetch.getOutputObjectInspector()});
- totalRows = 0;
- ExecMapper.setDone(false);
-
+ job = new JobConf(conf);
+ initFetch();
} catch (Exception e) {
// Bail out ungracefully - we should never hit
// this here - but would have hit it in SemanticAnalyzer
@@ -102,14 +87,16 @@ public class FetchTask extends Task<FetchWork> implements
Serializable {
private List<VirtualColumn> getVirtualColumns(Operator<?> ts) {
if (ts instanceof TableScanOperator && ts.getConf() != null) {
- return ((TableScanOperator)ts).getConf().getVirtualCols();
+ return ((TableScanOperator) ts).getConf().getVirtualCols();
}
return null;
}
@Override
public int execute() {
- assert false;
+ if (cachingEnabled) {
+ executeInner(fetchedData);
+ }
return 0;
}
@@ -134,38 +121,17 @@ public class FetchTask extends Task<FetchWork> implements
Serializable {
this.maxRows = maxRows;
}
- public boolean fetch(List res) throws IOException {
- sink.reset(res);
- int rowsRet = work.getLeastNumRows();
- if (rowsRet <= 0) {
- rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows,
maxRows) : maxRows;
- }
- try {
- if (rowsRet <= 0 || work.getLimit() == totalRows) {
- fetch.clearFetchContext();
+ public boolean fetch(List res) {
+ if (cachingEnabled) {
+ if (currentRow >= fetchedData.size()) {
return false;
}
- boolean fetched = false;
- while (sink.getNumRows() < rowsRet) {
- if (!fetch.pushRow()) {
- if (work.getLeastNumRows() > 0) {
- throw new HiveException("leastNumRows check failed");
- }
-
- // Closing the operator can sometimes yield more rows (HIVE-11892)
- fetch.closeOperator();
-
- return fetched;
- }
- fetched = true;
- }
+ int toIndex = Math.min(fetchedData.size(), currentRow + maxRows);
+ res.addAll(fetchedData.subList(currentRow, toIndex));
+ currentRow = toIndex;
return true;
- } catch (IOException e) {
- throw e;
- } catch (Exception e) {
- throw new IOException(e);
- } finally {
- totalRows += sink.getNumRows();
+ } else {
+ return executeInner(res);
}
}
@@ -192,10 +158,82 @@ public class FetchTask extends Task<FetchWork> implements
Serializable {
if (fetch != null) {
fetch.clearFetchContext();
}
+ fetchedData.clear();
+ }
+
+ public void resetFetch() throws HiveException {
+ if (cachingEnabled) {
+ currentRow = 0;
+ } else {
+ clearFetch();
+ initFetch();
+ }
}
@Override
public boolean canExecuteInParallel() {
return false;
}
-}
+
+ private boolean executeInner(List target) {
+ sink.reset(target);
+ int rowsRet;
+ if (cachingEnabled) {
+ rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE;
+ } else {
+ rowsRet = work.getLeastNumRows();
+ if (rowsRet <= 0) {
+ rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows,
maxRows) : maxRows;
+ }
+ }
+
+ try {
+ if (rowsRet <= 0 || work.getLimit() == totalRows) {
+ fetch.clearFetchContext();
+ return false;
+ }
+ boolean fetched = false;
+ while (sink.getNumRows() < rowsRet) {
+ if (!fetch.pushRow()) {
+ if (work.getLeastNumRows() > 0) {
+ throw new HiveException("leastNumRows check failed");
+ }
+
+ // Closing the operator can sometimes yield more rows (HIVE-11892)
+ fetch.closeOperator();
+
+ return fetched;
+ }
+ fetched = true;
+ }
+ return true;
+ } catch (Exception e) {
+ console.printError("Failed with exception " + e.getClass().getName() +
":" + e.getMessage(),
+ "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ } finally {
+ totalRows += sink.getNumRows();
+ }
+ }
+
+ private void initFetch() throws HiveException {
+ Operator<?> source = work.getSource();
+ if (source instanceof TableScanOperator) {
+ TableScanOperator ts = (TableScanOperator) source;
+ // push down projections
+ ColumnProjectionUtils.appendReadColumns(job, ts.getNeededColumnIDs(),
ts.getNeededColumns(),
+ ts.getNeededNestedColumnPaths(), ts.getConf().hasVirtualCols());
+ // push down filters and as of information
+ HiveInputFormat.pushFiltersAndAsOf(job, ts, null);
+
+ AcidUtils.setAcidOperationalProperties(job,
ts.getConf().isTranscationalTable(),
+ ts.getConf().getAcidOperationalProperties());
+ }
+ sink = work.getSink();
+ fetch = new FetchOperator(work, job, source, getVirtualColumns(source));
+ source.initialize(conf, new ObjectInspector[]{
fetch.getOutputObjectInspector() });
+ totalRows = 0;
+ ExecMapper.setDone(false);
+ }
+
+}
\ No newline at end of file
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index ee8cc28d63..5464b4e2d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -152,28 +152,32 @@ public class SimpleFetchOptimizer extends Transform {
}
private boolean checkThreshold(FetchData data, int limit, ParseContext pctx)
throws Exception {
- if (limit > 0) {
- if (data.hasOnlyPruningFilter()) {
- /* partitioned table + query has only pruning filters */
- return true;
- } else if (data.isPartitioned() == false && data.isFiltered() == false) {
- /* unpartitioned table + no filters */
- return true;
+ boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
HiveConf.ConfVars.HIVEFETCHTASKCACHING);
+ if (!cachingEnabled) {
+ if (limit > 0) {
+ if (data.hasOnlyPruningFilter()) {
+ // partitioned table + query has only pruning filters
+ return true;
+ } else if (data.isPartitioned() == false && data.isFiltered() ==
false) {
+ // unpartitioned table + no filters
+ return true;
+ }
+ // fall through
+ }
+ Operator child = data.scanOp.getChildOperators().get(0);
+ if(child instanceof SelectOperator) {
+ // select *, constant and casts can be allowed without a threshold
check
+ if (checkExpressions((SelectOperator)child)) {
+ return true;
+ }
}
- /* fall through */
}
+ // if caching is enabled we apply the treshold in all cases
long threshold = HiveConf.getLongVar(pctx.getConf(),
HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD);
if (threshold < 0) {
return true;
}
- Operator child = data.scanOp.getChildOperators().get(0);
- if(child instanceof SelectOperator) {
- // select *, constant and casts can be allowed without a threshold check
- if (checkExpressions((SelectOperator)child)) {
- return true;
- }
- }
return data.isDataLengthWithInThreshold(pctx, threshold);
}
diff --git a/ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q
b/ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q
index 6cc4543aab..6de2deafa1 100644
--- a/ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q
+++ b/ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q
@@ -3,6 +3,7 @@
set hive.fetch.task.conversion=more;
set hive.explain.user=true;
set hive.mapred.mode=nonstrict;
+set hive.fetch.task.caching=false;
explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
explain select cast(key as int) * 10, upper(value) from src limit 10;
diff --git a/ql/src/test/queries/clientpositive/nonmr_fetch_threshold2.q
b/ql/src/test/queries/clientpositive/nonmr_fetch_threshold2.q
new file mode 100644
index 0000000000..7ead70e6e3
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/nonmr_fetch_threshold2.q
@@ -0,0 +1,16 @@
+--! qt:dataset:srcpart
+--! qt:dataset:src
+set hive.fetch.task.conversion=more;
+set hive.explain.user=true;
+set hive.mapred.mode=nonstrict;
+set hive.fetch.task.caching=true;
+explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
+
+set hive.fetch.task.conversion.threshold=10000;
+
+explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
+
+-- with caching enabled, fetch task will be dropped, unlike when caching is
not enabled (see: nonmr_fetch_threshold.q)
+set hive.fetch.task.conversion.threshold=100;
+
+explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
diff --git a/ql/src/test/results/clientnegative/cluster_tasklog_retrieval.q.out
b/ql/src/test/results/clientnegative/cluster_tasklog_retrieval.q.out
index 8ba4013463..2bc328149b 100644
--- a/ql/src/test/results/clientnegative/cluster_tasklog_retrieval.q.out
+++ b/ql/src/test/results/clientnegative/cluster_tasklog_retrieval.q.out
@@ -14,4 +14,4 @@ SELECT evaluate_npe(src.key) LIMIT 1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
POSTHOOK: Output: hdfs://### HDFS PATH ###
-Failed with exception
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: Error
evaluating evaluate_npe(key)
+Failed with exception org.apache.hadoop.hive.ql.metadata.HiveException:Error
evaluating evaluate_npe(key)
diff --git a/ql/src/test/results/clientnegative/udf_assert_true.q.out
b/ql/src/test/results/clientnegative/udf_assert_true.q.out
index 2d6b9837d2..53ee2475ef 100644
--- a/ql/src/test/results/clientnegative/udf_assert_true.q.out
+++ b/ql/src/test/results/clientnegative/udf_assert_true.q.out
@@ -105,4 +105,4 @@ POSTHOOK: query: SELECT ASSERT_TRUE(x < 2) FROM src LATERAL
VIEW EXPLODE(ARRAY(1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
#### A masked pattern was here ####
-Failed with exception
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException:
ASSERT_TRUE(): assertion failed.
+Failed with exception
org.apache.hadoop.hive.ql.metadata.HiveException:ASSERT_TRUE(): assertion
failed.
diff --git a/ql/src/test/results/clientnegative/udf_assert_true2.q.out
b/ql/src/test/results/clientnegative/udf_assert_true2.q.out
index dc94fb1549..42f6c9959b 100644
--- a/ql/src/test/results/clientnegative/udf_assert_true2.q.out
+++ b/ql/src/test/results/clientnegative/udf_assert_true2.q.out
@@ -48,4 +48,4 @@ POSTHOOK: query: SELECT 1 + ASSERT_TRUE(x < 2) FROM src
LATERAL VIEW EXPLODE(ARR
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
#### A masked pattern was here ####
-Failed with exception
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException:
ASSERT_TRUE(): assertion failed.
+Failed with exception
org.apache.hadoop.hive.ql.metadata.HiveException:ASSERT_TRUE(): assertion
failed.
diff --git a/ql/src/test/results/clientnegative/udf_reflect_neg.q.out
b/ql/src/test/results/clientnegative/udf_reflect_neg.q.out
index a98811dba5..203a6326de 100644
--- a/ql/src/test/results/clientnegative/udf_reflect_neg.q.out
+++ b/ql/src/test/results/clientnegative/udf_reflect_neg.q.out
@@ -20,4 +20,4 @@ FROM src LIMIT 1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
#### A masked pattern was here ####
-Failed with exception
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException:
UDFReflect evaluate error while loading class
java.lang.StringClassThatDoesNotExist
+Failed with exception
org.apache.hadoop.hive.ql.metadata.HiveException:UDFReflect evaluate error
while loading class java.lang.StringClassThatDoesNotExist
diff --git a/ql/src/test/results/clientnegative/udf_test_error.q.out
b/ql/src/test/results/clientnegative/udf_test_error.q.out
index 65b39f1148..eff7b95c80 100644
--- a/ql/src/test/results/clientnegative/udf_test_error.q.out
+++ b/ql/src/test/results/clientnegative/udf_test_error.q.out
@@ -12,4 +12,4 @@ POSTHOOK: query: SELECT test_error(key < 125 OR key > 130)
FROM src
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
#### A masked pattern was here ####
-Failed with exception
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: Unable to
execute method public int
org.apache.hadoop.hive.ql.udf.UDFTestErrorOnFalse.evaluate(java.lang.Boolean):UDFTestErrorOnFalse
got b=false
+Failed with exception org.apache.hadoop.hive.ql.metadata.HiveException:Unable
to execute method public int
org.apache.hadoop.hive.ql.udf.UDFTestErrorOnFalse.evaluate(java.lang.Boolean):UDFTestErrorOnFalse
got b=false
diff --git
a/ql/src/test/results/clientpositive/llap/nonmr_fetch_threshold2.q.out
b/ql/src/test/results/clientpositive/llap/nonmr_fetch_threshold2.q.out
new file mode 100644
index 0000000000..fc52f26be1
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/nonmr_fetch_threshold2.q.out
@@ -0,0 +1,69 @@
+PREHOOK: query: explain select * from srcpart where ds='2008-04-08' AND
hr='11' limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: explain select * from srcpart where ds='2008-04-08' AND
hr='11' limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+Plan optimized by CBO.
+
+Stage-0
+ Fetch Operator
+ limit:10
+ Select Operator [SEL_2]
+ Output:["_col0","_col1","_col2","_col3"]
+ Limit [LIM_3]
+ Number of rows:10
+ TableScan [TS_0]
+ Output:["key","value"]
+
+PREHOOK: query: explain select * from srcpart where ds='2008-04-08' AND
hr='11' limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: explain select * from srcpart where ds='2008-04-08' AND
hr='11' limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+Plan optimized by CBO.
+
+Stage-0
+ Fetch Operator
+ limit:10
+ Select Operator [SEL_2]
+ Output:["_col0","_col1","_col2","_col3"]
+ Limit [LIM_3]
+ Number of rows:10
+ TableScan [TS_0]
+ Output:["key","value"]
+
+PREHOOK: query: explain select * from srcpart where ds='2008-04-08' AND
hr='11' limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: explain select * from srcpart where ds='2008-04-08' AND
hr='11' limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+Plan optimized by CBO.
+
+Stage-0
+ Fetch Operator
+ limit:10
+ Stage-1
+ Map 1 vectorized, llap
+ File Output Operator [FS_8]
+ Select Operator [SEL_7] (rows=10 width=358)
+ Output:["_col0","_col1","_col2","_col3"]
+ Limit [LIM_6] (rows=10 width=178)
+ Number of rows:10
+ TableScan [TS_0] (rows=500 width=178)
+
default@srcpart,srcpart,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"]
+