This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new f6906f2af1 HIVE-25976: Cleaner may remove files being accessed from a 
fetch-task-converted reader (Laszlo Vegh, reviewed by Peter Vary, Krisztian 
Kasa)
f6906f2af1 is described below

commit f6906f2af1e1feefc6f5b6a392307d0fb655bb1c
Author: veghlaci05 <[email protected]>
AuthorDate: Wed Jun 29 11:52:36 2022 +0200

    HIVE-25976: Cleaner may remove files being accessed from a 
fetch-task-converted reader (Laszlo Vegh, reviewed by Peter Vary, Krisztian 
Kasa)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   4 +
 .../mapreduce/TestHCatMultiOutputFormat.java       |   1 +
 .../iceberg/mr/hive/TestHiveIcebergTimeTravel.java |  25 ++--
 .../hive/minikdc/JdbcWithMiniKdcSQLAuthTest.java   |   1 +
 .../hive/minikdc/TestHs2HooksWithMiniKdc.java      |   1 +
 .../hive/minikdc/TestJdbcWithMiniKdcCookie.java    |   2 +
 .../apache/hive/minikdc/TestSSLWithMiniKdc.java    |   1 +
 .../parse/TestReplicationOnHDFSEncryptedZones.java |   7 +-
 .../hive/ql/txn/compactor/TestCompactor.java       |  40 ++++-
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  |  26 ++--
 .../org/apache/hadoop/hive/ql/exec/FetchTask.java  | 166 +++++++++++++--------
 .../hive/ql/optimizer/SimpleFetchOptimizer.java    |  34 +++--
 .../queries/clientpositive/nonmr_fetch_threshold.q |   1 +
 .../clientpositive/nonmr_fetch_threshold2.q        |  16 ++
 .../clientnegative/cluster_tasklog_retrieval.q.out |   2 +-
 .../results/clientnegative/udf_assert_true.q.out   |   2 +-
 .../results/clientnegative/udf_assert_true2.q.out  |   2 +-
 .../results/clientnegative/udf_reflect_neg.q.out   |   2 +-
 .../results/clientnegative/udf_test_error.q.out    |   2 +-
 .../llap/nonmr_fetch_threshold2.q.out              |  69 +++++++++
 20 files changed, 294 insertions(+), 110 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 67cfef75a3..50cfb85ba9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3503,6 +3503,10 @@ public class HiveConf extends Configuration {
         "1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only\n" +
         "2. more    : SELECT, FILTER, LIMIT only (support TABLESAMPLE and 
virtual columns)"
     ),
+    HIVEFETCHTASKCACHING("hive.fetch.task.caching", true,
+        "Enabling the caching of the result of fetch tasks eliminates the 
chance of running into a failing read." +
+            " On the other hand, if enabled, the 
hive.fetch.task.conversion.threshold must be adjusted accordingly. That" +
+            " is 1GB by default which must be lowered in case of enabled 
caching to prevent the consumption of too much memory."),
     HIVEFETCHTASKCONVERSIONTHRESHOLD("hive.fetch.task.conversion.threshold", 
1073741824L,
         "Input threshold for applying hive.fetch.task.conversion. If target 
table is native, input length\n" +
         "is calculated by summation of file lengths. If it's not native, 
storage handler for the table\n" +
diff --git 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
index c62d356ddd..93d1418afa 100644
--- 
a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
+++ 
b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
@@ -388,6 +388,7 @@ public class TestHCatMultiOutputFormat {
     conf.set("_hive.hdfs.session.path", "path");
     conf.set("_hive.local.session.path", "path");
     task.initialize(queryState, null, null, new 
org.apache.hadoop.hive.ql.Context(conf));
+    task.execute();
     task.fetch(temp);
     for (String str : temp) {
       results.add(str.replace("\t", ","));
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java
index 233f87a857..13cfd975f4 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTimeTravel.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
 import java.util.List;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.HistoryEntry;
 import org.apache.iceberg.Table;
 import org.junit.Assert;
@@ -50,10 +49,14 @@ public class TestHiveIcebergTimeTravel extends 
HiveIcebergStorageHandlerWithEngi
 
     Assert.assertEquals(4, rows.size());
 
-    AssertHelpers.assertThrows("should throw exception", 
IllegalArgumentException.class,
-        "java.lang.IllegalArgumentException: Cannot find a snapshot older than 
1970-01-01", () -> {
-        shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS OF 
'1970-01-01 00:00:00'");
-        });
+    try {
+      shell.executeStatement("SELECT * FROM customers FOR SYSTEM_TIME AS OF 
'1970-01-01 00:00:00'");
+    } catch (Throwable e) {
+      while (e.getCause() != null) {
+        e = e.getCause();
+      }
+      Assert.assertTrue(e.getMessage().contains("Cannot find a snapshot older 
than 1970-01-01"));
+    }
   }
 
   @Test
@@ -73,10 +76,14 @@ public class TestHiveIcebergTimeTravel extends 
HiveIcebergStorageHandlerWithEngi
 
     Assert.assertEquals(4, rows.size());
 
-    AssertHelpers.assertThrows("should throw exception", 
IllegalArgumentException.class,
-        "Cannot find snapshot with ID 1234", () -> {
-          shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION 
AS OF 1234");
-        });
+    try {
+      shell.executeStatement("SELECT * FROM customers FOR SYSTEM_VERSION AS OF 
1234");
+    } catch (Throwable e) {
+      while (e.getCause() != null) {
+        e = e.getCause();
+      }
+      Assert.assertTrue(e.getMessage().contains("Cannot find snapshot with ID 
1234"));
+    }
   }
 
   @Test
diff --git 
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/JdbcWithMiniKdcSQLAuthTest.java
 
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/JdbcWithMiniKdcSQLAuthTest.java
index ca10a45732..fccf3e0209 100644
--- 
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/JdbcWithMiniKdcSQLAuthTest.java
+++ 
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/JdbcWithMiniKdcSQLAuthTest.java
@@ -59,6 +59,7 @@ public abstract class JdbcWithMiniKdcSQLAuthTest {
     hiveConf.setBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED, true);
     hiveConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
     hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+    hiveConf.setBoolVar(ConfVars.HIVEFETCHTASKCACHING, false);
 
     miniHS2 = MiniHiveKdc.getMiniHS2WithKerb(miniHiveKdc, hiveConf);
     miniHS2.start(new HashMap<String, String>());
diff --git 
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java
 
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java
index bb244a003e..890e4092ea 100644
--- 
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java
+++ 
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java
@@ -55,6 +55,7 @@ public class TestHs2HooksWithMiniKdc {
     confOverlay.put(ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
         SemanticAnalysisHook.class.getName());
     confOverlay.put(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "" + 
Boolean.FALSE);
+    confOverlay.put(ConfVars.HIVEFETCHTASKCACHING.varname, "" + false);
 
     miniHiveKdc = new MiniHiveKdc();
     HiveConf hiveConf = new HiveConf();
diff --git 
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdcCookie.java
 
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdcCookie.java
index 33189202ab..883d333dd4 100644
--- 
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdcCookie.java
+++ 
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdcCookie.java
@@ -81,6 +81,8 @@ public class TestJdbcWithMiniKdcCookie {
     hiveConf.setTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE,
         1, TimeUnit.SECONDS);
     hiveConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    hiveConf.setBoolVar(ConfVars.HIVEFETCHTASKCACHING, false);
+
     miniHS2 = MiniHiveKdc.getMiniHS2WithKerb(miniHiveKdc, hiveConf);
     miniHS2.start(new HashMap<String, String>());
   }
diff --git 
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
 
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
index f6869c31da..cea9503697 100644
--- 
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
+++ 
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
@@ -50,6 +50,7 @@ public class TestSSLWithMiniKdc {
 
     SSLTestUtils.setMetastoreSslConf(hiveConf);
     hiveConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    hiveConf.setBoolVar(ConfVars.HIVEFETCHTASKCACHING, false);
 
     setHMSSaslConf(miniHiveKdc, hiveConf);
 
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
index 94324c1317..c33188ff16 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
@@ -152,8 +152,11 @@ public class TestReplicationOnHDFSEncryptedZones {
               .run("select value from encrypted_table")
               .verifyResults(new String[] { "value1", "value2" });
       Assert.fail("Src EZKey shouldn't be present on target");
-    } catch (IOException e) {
-      Assert.assertTrue(e.getCause().getMessage().contains("KeyVersion name 
'test_key@0' does not exist"));
+    } catch (Throwable e) {
+      while (e.getCause() != null) {
+        e = e.getCause();
+      }
+      Assert.assertTrue(e.getMessage().contains("KeyVersion name 'test_key@0' 
does not exist"));
     }
 
     //read should pass without raw-byte distcp
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 5673446be7..7f3c239b46 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -72,11 +72,13 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.common.util.Retry;
 import org.apache.hive.hcatalog.common.HCatUtil;
@@ -1025,6 +1027,36 @@ public class TestCompactor {
             Lists.newArrayList(5, 6), 1);
   }
 
+  @Test
+  public void majorCompactDuringFetchTaskConvertedRead() throws Exception {
+    driver.close();
+    driver = DriverFactory.newDriver(conf);
+    String dbName = "default";
+    String tblName = "cws";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+
+    executeStatementOnDriver(
+        "CREATE TABLE " + tblName + "(a INT, b STRING) " + " STORED AS ORC  
TBLPROPERTIES ('transactional'='true')",
+        driver);
+    executeStatementOnDriver("insert into " + tblName + " values (1, 'a')", 
driver);
+    executeStatementOnDriver("insert into " + tblName + " values (3, 'b')", 
driver);
+    executeStatementOnDriver("insert into " + tblName + " values (4, 'a')", 
driver);
+    executeStatementOnDriver("insert into " + tblName + " values (5, 'b')", 
driver);
+
+    CommandProcessorResponse resp = driver.run("select * from " + tblName + " 
LIMIT 5");
+    FetchTask ft = driver.getFetchTask();
+    ft.setMaxRows(1);
+    List res = new ArrayList();
+    ft.fetch(res);
+    assertEquals(1, res.size());
+
+    runMajorCompaction(dbName, tblName);
+    runCleaner(conf);
+
+    ft.fetch(res);
+    assertEquals(2, res.size());
+  }
+
   @Test
   public void testCleanAbortCompactAfter2ndCommitAbort() throws Exception {
     String dbName = "default";
@@ -1727,8 +1759,8 @@ public class TestCompactor {
     verifyHasBase(table.getSd(), fs, "base_0000005_v0000017");
     runCleaner(conf);
     // in case when we have # of accumulated entries for the same 
table/partition - we need to process them one-by-one in ASC order of write_id's,
-    // however, to support multi-threaded processing in the Cleaner, we have 
to move entries from the same group to the next Cleaner cycle, 
-    // so that they are not processed by multiple threads concurrently. 
+    // however, to support multi-threaded processing in the Cleaner, we have 
to move entries from the same group to the next Cleaner cycle,
+    // so that they are not processed by multiple threads concurrently.
     runCleaner(conf);
     verifyDeltaCount(table.getSd(), fs, 0);
   }
@@ -1742,7 +1774,9 @@ public class TestCompactor {
       StorageDescriptor sd, FileSystem fs, String name, PathFilter filter) 
throws Exception {
     FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter);
     for (FileStatus file : stat) {
-      if (name.equals(file.getPath().getName())) return;
+      if (name.equals(file.getPath().getName())) {
+        return;
+      }
     }
     Assert.fail("Cannot find " + name + ": " + Arrays.toString(stat));
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index c2ecbf46bf..3565763038 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -203,6 +203,18 @@ public class Driver implements IDriver {
         perfLogger = SessionState.getPerfLogger(true);
       }
       execute();
+
+      FetchTask fetchTask = driverContext.getPlan().getFetchTask();
+      if (fetchTask != null) {
+        fetchTask.setTaskQueue(null);
+        fetchTask.setQueryPlan(null);
+        try {
+          fetchTask.execute();
+          driverContext.setFetchTask(fetchTask);
+        } catch (Throwable e) {
+          throw new CommandProcessorException(e);
+        }
+      }
       driverTxnHandler.handleTransactionAfterExecution();
 
       
driverContext.getQueryDisplay().setPerfLogStarts(QueryDisplay.Phase.EXECUTION, 
perfLogger.getStartTimes());
@@ -635,12 +647,10 @@ public class Driver implements IDriver {
     }
     if (isFetchingTable()) {
       try {
-        driverContext.getFetchTask().clearFetch();
+        driverContext.getFetchTask().resetFetch();
       } catch (Exception e) {
-        throw new IOException("Error closing the current fetch task", e);
+        throw new IOException("Error resetting the current fetch task", e);
       }
-      // FetchTask should not depend on the plan.
-      driverContext.getFetchTask().initialize(driverContext.getQueryState(), 
null, null, context);
     } else {
       context.resetStream();
       driverContext.setResStream(null);
@@ -782,14 +792,6 @@ public class Driver implements IDriver {
 
   private void releasePlan() {
     try {
-      if (driverContext.getPlan() != null) {
-        FetchTask fetchTask = driverContext.getPlan().getFetchTask();
-        if (fetchTask != null) {
-          fetchTask.setTaskQueue(null);
-          fetchTask.setQueryPlan(null);
-        }
-        driverContext.setFetchTask(fetchTask);
-      }
       driverContext.setPlan(null);
     } catch (Exception e) {
       LOG.debug("Exception while clearing the Fetch task", e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index 64a9a1becd..61157c35a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -18,16 +18,11 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.TaskQueue;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.TaskQueue;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -40,6 +35,12 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * FetchTask implementation.
@@ -49,9 +50,12 @@ public class FetchTask extends Task<FetchWork> implements 
Serializable {
   private int maxRows = 100;
   private FetchOperator fetch;
   private ListSinkOperator sink;
+  private List fetchedData;
+  private int currentRow;
   private int totalRows;
-  private static transient final Logger LOG = 
LoggerFactory.getLogger(FetchTask.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FetchTask.class);
   JobConf job = null;
+  private boolean cachingEnabled;
 
   public FetchTask() {
     super();
@@ -66,32 +70,13 @@ public class FetchTask extends Task<FetchWork> implements 
Serializable {
     super.initialize(queryState, queryPlan, taskQueue, context);
     work.initializeForFetch(context.getOpContext());
 
+    cachingEnabled = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVEFETCHTASKCACHING);
+    fetchedData = new ArrayList<>();
+
     try {
       // Create a file system handle
-      if (job == null) {
-        // The job config should be initilaized once per fetch task. In case 
of refetch, we should use the
-        // same config.
-        job = new JobConf(conf);
-      }
-
-      Operator<?> source = work.getSource();
-      if (source instanceof TableScanOperator) {
-        TableScanOperator ts = (TableScanOperator) source;
-        // push down projections
-        ColumnProjectionUtils.appendReadColumns(job, ts.getNeededColumnIDs(), 
ts.getNeededColumns(),
-                ts.getNeededNestedColumnPaths(), 
ts.getConf().hasVirtualCols());
-        // push down filters and as of information
-        HiveInputFormat.pushFiltersAndAsOf(job, ts, null);
-
-        AcidUtils.setAcidOperationalProperties(job, 
ts.getConf().isTranscationalTable(),
-            ts.getConf().getAcidOperationalProperties());
-      }
-      sink = work.getSink();
-      fetch = new FetchOperator(work, job, source, getVirtualColumns(source));
-      source.initialize(conf, new 
ObjectInspector[]{fetch.getOutputObjectInspector()});
-      totalRows = 0;
-      ExecMapper.setDone(false);
-
+      job = new JobConf(conf);
+      initFetch();
     } catch (Exception e) {
       // Bail out ungracefully - we should never hit
       // this here - but would have hit it in SemanticAnalyzer
@@ -102,14 +87,16 @@ public class FetchTask extends Task<FetchWork> implements 
Serializable {
 
   private List<VirtualColumn> getVirtualColumns(Operator<?> ts) {
     if (ts instanceof TableScanOperator && ts.getConf() != null) {
-      return ((TableScanOperator)ts).getConf().getVirtualCols();
+      return ((TableScanOperator) ts).getConf().getVirtualCols();
     }
     return null;
   }
 
   @Override
   public int execute() {
-    assert false;
+    if (cachingEnabled) {
+      executeInner(fetchedData);
+    }
     return 0;
   }
 
@@ -134,38 +121,17 @@ public class FetchTask extends Task<FetchWork> implements 
Serializable {
     this.maxRows = maxRows;
   }
 
-  public boolean fetch(List res) throws IOException {
-    sink.reset(res);
-    int rowsRet = work.getLeastNumRows();
-    if (rowsRet <= 0) {
-      rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows, 
maxRows) : maxRows;
-    }
-    try {
-      if (rowsRet <= 0 || work.getLimit() == totalRows) {
-        fetch.clearFetchContext();
+  public boolean fetch(List res) {
+    if (cachingEnabled) {
+      if (currentRow >= fetchedData.size()) {
         return false;
       }
-      boolean fetched = false;
-      while (sink.getNumRows() < rowsRet) {
-        if (!fetch.pushRow()) {
-          if (work.getLeastNumRows() > 0) {
-            throw new HiveException("leastNumRows check failed");
-          }
-
-          // Closing the operator can sometimes yield more rows (HIVE-11892)
-          fetch.closeOperator();
-
-          return fetched;
-        }
-        fetched = true;
-      }
+      int toIndex = Math.min(fetchedData.size(), currentRow + maxRows);
+      res.addAll(fetchedData.subList(currentRow, toIndex));
+      currentRow = toIndex;
       return true;
-    } catch (IOException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new IOException(e);
-    } finally {
-      totalRows += sink.getNumRows();
+    } else {
+      return executeInner(res);
     }
   }
 
@@ -192,10 +158,82 @@ public class FetchTask extends Task<FetchWork> implements 
Serializable {
     if (fetch != null) {
       fetch.clearFetchContext();
     }
+    fetchedData.clear();
+  }
+
+  public void resetFetch() throws HiveException {
+    if (cachingEnabled) {
+      currentRow = 0;
+    } else {
+      clearFetch();
+      initFetch();
+    }
   }
 
   @Override
   public boolean canExecuteInParallel() {
     return false;
   }
-}
+
+  private boolean executeInner(List target) {
+    sink.reset(target);
+    int rowsRet;
+    if (cachingEnabled) {
+      rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE;
+    } else {
+      rowsRet = work.getLeastNumRows();
+      if (rowsRet <= 0) {
+        rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows, 
maxRows) : maxRows;
+      }
+    }
+
+    try {
+      if (rowsRet <= 0 || work.getLimit() == totalRows) {
+        fetch.clearFetchContext();
+        return false;
+      }
+      boolean fetched = false;
+      while (sink.getNumRows() < rowsRet) {
+        if (!fetch.pushRow()) {
+          if (work.getLeastNumRows() > 0) {
+            throw new HiveException("leastNumRows check failed");
+          }
+
+          // Closing the operator can sometimes yield more rows (HIVE-11892)
+          fetch.closeOperator();
+
+          return fetched;
+        }
+        fetched = true;
+      }
+      return true;
+    } catch (Exception e) {
+      console.printError("Failed with exception " + e.getClass().getName() + 
":" + e.getMessage(),
+          "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+      throw new RuntimeException(e);
+    } finally {
+      totalRows += sink.getNumRows();
+    }
+  }
+
+  private void initFetch() throws HiveException {
+    Operator<?> source = work.getSource();
+    if (source instanceof TableScanOperator) {
+      TableScanOperator ts = (TableScanOperator) source;
+      // push down projections
+      ColumnProjectionUtils.appendReadColumns(job, ts.getNeededColumnIDs(), 
ts.getNeededColumns(),
+          ts.getNeededNestedColumnPaths(), ts.getConf().hasVirtualCols());
+      // push down filters and as of information
+      HiveInputFormat.pushFiltersAndAsOf(job, ts, null);
+
+      AcidUtils.setAcidOperationalProperties(job, 
ts.getConf().isTranscationalTable(),
+          ts.getConf().getAcidOperationalProperties());
+    }
+    sink = work.getSink();
+    fetch = new FetchOperator(work, job, source, getVirtualColumns(source));
+    source.initialize(conf, new ObjectInspector[]{ 
fetch.getOutputObjectInspector() });
+    totalRows = 0;
+    ExecMapper.setDone(false);
+  }
+
+}
\ No newline at end of file
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index ee8cc28d63..5464b4e2d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -152,28 +152,32 @@ public class SimpleFetchOptimizer extends Transform {
   }
 
   private boolean checkThreshold(FetchData data, int limit, ParseContext pctx) 
throws Exception {
-    if (limit > 0) {
-      if (data.hasOnlyPruningFilter()) {
-        /* partitioned table + query has only pruning filters */
-        return true;
-      } else if (data.isPartitioned() == false && data.isFiltered() == false) {
-        /* unpartitioned table + no filters */
-        return true;
+    boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCACHING);
+    if (!cachingEnabled) {
+      if (limit > 0) {
+        if (data.hasOnlyPruningFilter()) {
+          // partitioned table + query has only pruning filters
+          return true;
+        } else if (data.isPartitioned() == false && data.isFiltered() == 
false) {
+          // unpartitioned table + no filters
+          return true;
+        }
+        // fall through
+      }
+      Operator child = data.scanOp.getChildOperators().get(0);
+      if(child instanceof SelectOperator) {
+        // select *, constant and casts can be allowed without a threshold 
check
+        if (checkExpressions((SelectOperator)child)) {
+          return true;
+        }
       }
-      /* fall through */
     }
+    // if caching is enabled we apply the treshold in all cases
     long threshold = HiveConf.getLongVar(pctx.getConf(),
         HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD);
     if (threshold < 0) {
       return true;
     }
-    Operator child = data.scanOp.getChildOperators().get(0);
-    if(child instanceof SelectOperator) {
-      // select *, constant and casts can be allowed without a threshold check
-      if (checkExpressions((SelectOperator)child)) {
-        return true;
-      }
-    }
     return data.isDataLengthWithInThreshold(pctx, threshold);
   }
 
diff --git a/ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q 
b/ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q
index 6cc4543aab..6de2deafa1 100644
--- a/ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q
+++ b/ql/src/test/queries/clientpositive/nonmr_fetch_threshold.q
@@ -3,6 +3,7 @@
 set hive.fetch.task.conversion=more;
 set hive.explain.user=true;
 set hive.mapred.mode=nonstrict;
+set hive.fetch.task.caching=false;
 explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
 explain select cast(key as int) * 10, upper(value) from src limit 10;
 
diff --git a/ql/src/test/queries/clientpositive/nonmr_fetch_threshold2.q 
b/ql/src/test/queries/clientpositive/nonmr_fetch_threshold2.q
new file mode 100644
index 0000000000..7ead70e6e3
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/nonmr_fetch_threshold2.q
@@ -0,0 +1,16 @@
+--! qt:dataset:srcpart
+--! qt:dataset:src
+set hive.fetch.task.conversion=more;
+set hive.explain.user=true;
+set hive.mapred.mode=nonstrict;
+set hive.fetch.task.caching=true;
+explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
+
+set hive.fetch.task.conversion.threshold=10000;
+
+explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
+
+-- with caching enabled, fetch task will be dropped, unlike when caching is 
not enabled (see: nonmr_fetch_threshold.q)
+set hive.fetch.task.conversion.threshold=100;
+
+explain select * from srcpart where ds='2008-04-08' AND hr='11' limit 10;
diff --git a/ql/src/test/results/clientnegative/cluster_tasklog_retrieval.q.out 
b/ql/src/test/results/clientnegative/cluster_tasklog_retrieval.q.out
index 8ba4013463..2bc328149b 100644
--- a/ql/src/test/results/clientnegative/cluster_tasklog_retrieval.q.out
+++ b/ql/src/test/results/clientnegative/cluster_tasklog_retrieval.q.out
@@ -14,4 +14,4 @@ SELECT evaluate_npe(src.key) LIMIT 1
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: Error 
evaluating evaluate_npe(key)
+Failed with exception org.apache.hadoop.hive.ql.metadata.HiveException:Error 
evaluating evaluate_npe(key)
diff --git a/ql/src/test/results/clientnegative/udf_assert_true.q.out 
b/ql/src/test/results/clientnegative/udf_assert_true.q.out
index 2d6b9837d2..53ee2475ef 100644
--- a/ql/src/test/results/clientnegative/udf_assert_true.q.out
+++ b/ql/src/test/results/clientnegative/udf_assert_true.q.out
@@ -105,4 +105,4 @@ POSTHOOK: query: SELECT ASSERT_TRUE(x < 2) FROM src LATERAL 
VIEW EXPLODE(ARRAY(1
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 #### A masked pattern was here ####
-Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
ASSERT_TRUE(): assertion failed.
+Failed with exception 
org.apache.hadoop.hive.ql.metadata.HiveException:ASSERT_TRUE(): assertion 
failed.
diff --git a/ql/src/test/results/clientnegative/udf_assert_true2.q.out 
b/ql/src/test/results/clientnegative/udf_assert_true2.q.out
index dc94fb1549..42f6c9959b 100644
--- a/ql/src/test/results/clientnegative/udf_assert_true2.q.out
+++ b/ql/src/test/results/clientnegative/udf_assert_true2.q.out
@@ -48,4 +48,4 @@ POSTHOOK: query: SELECT 1 + ASSERT_TRUE(x < 2) FROM src 
LATERAL VIEW EXPLODE(ARR
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 #### A masked pattern was here ####
-Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
ASSERT_TRUE(): assertion failed.
+Failed with exception 
org.apache.hadoop.hive.ql.metadata.HiveException:ASSERT_TRUE(): assertion 
failed.
diff --git a/ql/src/test/results/clientnegative/udf_reflect_neg.q.out 
b/ql/src/test/results/clientnegative/udf_reflect_neg.q.out
index a98811dba5..203a6326de 100644
--- a/ql/src/test/results/clientnegative/udf_reflect_neg.q.out
+++ b/ql/src/test/results/clientnegative/udf_reflect_neg.q.out
@@ -20,4 +20,4 @@ FROM src LIMIT 1
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 #### A masked pattern was here ####
-Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
UDFReflect evaluate error while loading class 
java.lang.StringClassThatDoesNotExist
+Failed with exception 
org.apache.hadoop.hive.ql.metadata.HiveException:UDFReflect evaluate error 
while loading class java.lang.StringClassThatDoesNotExist
diff --git a/ql/src/test/results/clientnegative/udf_test_error.q.out 
b/ql/src/test/results/clientnegative/udf_test_error.q.out
index 65b39f1148..eff7b95c80 100644
--- a/ql/src/test/results/clientnegative/udf_test_error.q.out
+++ b/ql/src/test/results/clientnegative/udf_test_error.q.out
@@ -12,4 +12,4 @@ POSTHOOK: query: SELECT test_error(key < 125 OR key > 130) 
FROM src
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 #### A masked pattern was here ####
-Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: Unable to 
execute method public int 
org.apache.hadoop.hive.ql.udf.UDFTestErrorOnFalse.evaluate(java.lang.Boolean):UDFTestErrorOnFalse
 got b=false
+Failed with exception org.apache.hadoop.hive.ql.metadata.HiveException:Unable 
to execute method public int 
org.apache.hadoop.hive.ql.udf.UDFTestErrorOnFalse.evaluate(java.lang.Boolean):UDFTestErrorOnFalse
 got b=false
diff --git 
a/ql/src/test/results/clientpositive/llap/nonmr_fetch_threshold2.q.out 
b/ql/src/test/results/clientpositive/llap/nonmr_fetch_threshold2.q.out
new file mode 100644
index 0000000000..fc52f26be1
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/nonmr_fetch_threshold2.q.out
@@ -0,0 +1,69 @@
+PREHOOK: query: explain select * from srcpart where ds='2008-04-08' AND 
hr='11' limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: explain select * from srcpart where ds='2008-04-08' AND 
hr='11' limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+Plan optimized by CBO.
+
+Stage-0
+  Fetch Operator
+    limit:10
+    Select Operator [SEL_2]
+      Output:["_col0","_col1","_col2","_col3"]
+      Limit [LIM_3]
+        Number of rows:10
+        TableScan [TS_0]
+          Output:["key","value"]
+
+PREHOOK: query: explain select * from srcpart where ds='2008-04-08' AND 
hr='11' limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: explain select * from srcpart where ds='2008-04-08' AND 
hr='11' limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+Plan optimized by CBO.
+
+Stage-0
+  Fetch Operator
+    limit:10
+    Select Operator [SEL_2]
+      Output:["_col0","_col1","_col2","_col3"]
+      Limit [LIM_3]
+        Number of rows:10
+        TableScan [TS_0]
+          Output:["key","value"]
+
+PREHOOK: query: explain select * from srcpart where ds='2008-04-08' AND 
hr='11' limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+POSTHOOK: query: explain select * from srcpart where ds='2008-04-08' AND 
hr='11' limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+#### A masked pattern was here ####
+Plan optimized by CBO.
+
+Stage-0
+  Fetch Operator
+    limit:10
+    Stage-1
+      Map 1 vectorized, llap
+      File Output Operator [FS_8]
+        Select Operator [SEL_7] (rows=10 width=358)
+          Output:["_col0","_col1","_col2","_col3"]
+          Limit [LIM_6] (rows=10 width=178)
+            Number of rows:10
+            TableScan [TS_0] (rows=500 width=178)
+              
default@srcpart,srcpart,Tbl:COMPLETE,Col:COMPLETE,Output:["key","value"]
+

Reply via email to