Repository: hive Updated Branches: refs/heads/master d43938ca1 -> dc4c66f6b
HIVE-13759: LlapTaskUmbilicalExternalClient should be closed by the record reader (Jason Dere, reviewed by Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dc4c66f6 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dc4c66f6 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dc4c66f6 Branch: refs/heads/master Commit: dc4c66f6babf0d52344d52e9af37cdfa2ed42be0 Parents: d43938c Author: Jason Dere <[email protected]> Authored: Wed Jun 15 10:57:17 2016 -0700 Committer: Jason Dere <[email protected]> Committed: Wed Jun 15 10:57:17 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/LlapBaseRecordReader.java | 26 ++++++++++++++++++-- .../ext/LlapTaskUmbilicalExternalClient.java | 3 ++- .../hadoop/hive/llap/LlapBaseInputFormat.java | 25 +++++-------------- .../hadoop/hive/llap/LlapRowInputFormat.java | 4 +-- .../hadoop/hive/llap/TestLlapOutputFormat.java | 2 +- 5 files changed, 35 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/dc4c66f6/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index 3c858a8..f2700c8 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap; +import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -52,14 +53,16 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor protected Thread readerThread = null; protected final LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>(); protected final long timeout; + protected final Closeable client; - public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz, JobConf job) { + public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz, JobConf job, Closeable client) { din = new DataInputStream(in); this.schema = schema; this.clazz = clazz; this.readerThread = Thread.currentThread(); this.timeout = 3 * HiveConf.getTimeVar(job, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + this.client = client; } public Schema getSchema() { @@ -68,7 +71,26 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor @Override public void close() throws IOException { - din.close(); + Exception caughtException = null; + try { + din.close(); + } catch (Exception err) { + LOG.error("Error closing input stream:" + err.getMessage(), err); + caughtException = err; + } + + if (client != null) { + try { + client.close(); + } catch (Exception err) { + LOG.error("Error closing client:" + err.getMessage(), err); + caughtException = (caughtException == null ? err : caughtException); + } + } + + if (caughtException != null) { + throw new IOException("Exception during close: " + caughtException.getMessage(), caughtException); + } } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/dc4c66f6/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index 85943d2..5f250b4 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.hive.llap.ext; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -58,7 +59,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class LlapTaskUmbilicalExternalClient extends AbstractService { +public class LlapTaskUmbilicalExternalClient extends AbstractService implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class); http://git-wip-us.apache.org/repos/asf/hive/blob/dc4c66f6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 4d17080..46030ec 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -99,9 +99,6 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)"; - private Connection con; - private Statement stmt; - public LlapBaseInputFormat(String url, String user, String pwd, String query) { this.url = url; this.user = user; @@ -172,7 +169,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> @SuppressWarnings("rawtypes") LlapBaseRecordReader recordReader = new LlapBaseRecordReader( - socket.getInputStream(), llapSplit.getSchema(), Text.class, job); + socket.getInputStream(), llapSplit.getSchema(), Text.class, job, llapClient); umbilicalResponder.setRecordReader(recordReader); return recordReader; } @@ -196,11 +193,12 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> throw new IOException(e); } - try { - con = DriverManager.getConnection(url,user,pwd); - stmt = con.createStatement(); - String sql = String.format(SPLIT_QUERY, query, numSplits); + String sql = String.format(SPLIT_QUERY, query, numSplits); + try ( + Connection con = DriverManager.getConnection(url,user,pwd); + Statement stmt = con.createStatement(); ResultSet res = stmt.executeQuery(sql); + ) { while (res.next()) { // deserialize split DataInput in = new DataInputStream(res.getBinaryStream(1)); @@ -208,23 +206,12 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> is.readFields(in); ins.add(is); } - - res.close(); - stmt.close(); } catch (Exception e) { throw new IOException(e); } return ins.toArray(new InputSplit[ins.size()]); } - public void close() { - try { - con.close(); - } catch (Exception e) { - // ignore - } - } - private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException { LlapRegistryService registryService = LlapRegistryService.getClient(job); String host = llapSplit.getLocations()[0]; http://git-wip-us.apache.org/repos/asf/hive/blob/dc4c66f6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java index 7efc711..c3001e9 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java @@ -35,9 +35,9 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; - public class LlapRowInputFormat implements InputFormat<NullWritable, Row> { - LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>(); + + private LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>(); @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/dc4c66f6/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java index 577037c..2288cd4 100644 --- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java @@ -103,7 +103,7 @@ public class TestLlapOutputFormat { writer.close(null); InputStream in = socket.getInputStream(); - LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job); + LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job, null); LOG.debug("Have record reader");
