Repository: kudu Updated Branches: refs/heads/master 391e3255d -> 1277f69a1
KUDU-2249 Avoid sharing the client between the InputFormat and RecordReader This commit prevents a possible race condition between getSplits() method and TableRecordReader in the KuduTableInputFormat, when both try to access and shutdown the KuduClient. Both are sharing the same client and shut it down after use. In some scenarios the client might still be accessed after that and throwing an error. So the TableRecordReader gets its own client with this commit. This increases the number of opened Kudu clients by a MR application at most by one (The one that was shared by getSplits() with a TableRecordReader) Also clarified the behaviour of MR applications and how many open Kudu clients one might have to expect in total. Change-Id: I24f45ee9253790c5348cabd0afe6c6a4b6d3f3d4 Reviewed-on: http://gerrit.cloudera.org:8080/8921 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7f4157c4 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7f4157c4 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7f4157c4 Branch: refs/heads/master Commit: 7f4157c416c1f17e6326753f2138bd17fa79ad66 Parents: 391e325 Author: Clemens Valiente <[email protected]> Authored: Thu Dec 28 10:34:26 2017 +0100 Committer: David Ribeiro Alves <[email protected]> Committed: Mon Jan 8 18:57:11 2018 +0000 ---------------------------------------------------------------------- .../kudu/mapreduce/KuduTableInputFormat.java | 66 +++++++++++++------- 1 file changed, 45 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/7f4157c4/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java ---------------------------------------------------------------------- diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java index fcbf10e..a018ae2 100644 --- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java +++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java @@ -70,9 +70,24 @@ import org.apache.kudu.client.RowResultIterator; * * <p> * Hadoop doesn't have the concept of "closing" the input format so in order to release the - * resources we assume that once either {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)} - * or {@link KuduTableInputFormat.TableRecordReader#close()} have been called that - * the object won't be used again and the AsyncKuduClient is shut down. + * resources (mainly, the Kudu client) we assume that once either + * {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)} + * or {@link KuduTableInputFormat.TableRecordReader#close()} + * have been called that the object won't be used again and the AsyncKuduClient is shut down. + * + * To prevent a premature shutdown of the client, the KuduTableInputFormat and the + * TableRecordReader both get their own client that they don't share. + * </p> + * + * <p> + * Default behavior of hadoop is to call {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)} + * in the MRAppMaster and for each inputSplit (in our case, Kudu tablet) will spawn one Mapper + * with a TableRecordReader reading one Tablet. + * + * Therefore, total number of Kudu clients opened over the course of a MR application can be + * estimated by (#Tablets +1). To reduce the number of concurrent open clients, it might be + * advisable to restrict resources of the MR application or implement the + * {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat} over this InputFormat. * </p> */ @InterfaceAudience.Public @@ -161,15 +176,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult> } return splits; } finally { - shutdownClient(); - } - } - - private void shutdownClient() throws IOException { - try { - client.shutdown(); - } catch (Exception e) { - throw new IOException(e); + shutdownClient(client); } } @@ -214,12 +221,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult> String tableName = conf.get(INPUT_TABLE_KEY); String masterAddresses = conf.get(MASTER_ADDRESSES_KEY); - this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY, - AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS); - this.client = new KuduClient.KuduClientBuilder(masterAddresses) - .defaultOperationTimeoutMs(operationTimeoutMs) - .build(); - KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(client); + this.client = buildKuduClient(); this.nameServer = conf.get(NAME_SERVER_KEY); this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false); this.isFaultTolerant = conf.getBoolean(FAULT_TOLERANT_SCAN, false); @@ -263,6 +265,26 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult> } } + private KuduClient buildKuduClient() { + + String masterAddresses = conf.get(MASTER_ADDRESSES_KEY); + this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY, + AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS); + KuduClient kuduClient = new KuduClient.KuduClientBuilder(masterAddresses) + .defaultOperationTimeoutMs(operationTimeoutMs) + .build(); + KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(kuduClient); + return kuduClient; + } + + private void shutdownClient(KuduClient kuduClient) throws IOException { + try { + kuduClient.shutdown(); + } catch (Exception e) { + throw new IOException(e); + } + } + /** * Given a PTR string generated via reverse DNS lookup, return everything * except the trailing period. Example for host.example.com., return @@ -384,6 +406,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult> private RowResultIterator iterator; private KuduScanner scanner; private TableSplit split; + private KuduClient kuduClient; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) @@ -393,9 +416,10 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult> } split = (TableSplit) inputSplit; + kuduClient = buildKuduClient(); LOG.debug("Creating scanner for token: {}", - KuduScanToken.stringifySerializedToken(split.getScanToken(), client)); - scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), client); + KuduScanToken.stringifySerializedToken(split.getScanToken(), kuduClient)); + scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), kuduClient); // Calling this now to set iterator. tryRefreshIterator(); @@ -452,7 +476,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult> } catch (Exception e) { throw new IOException(e); } - shutdownClient(); + shutdownClient(kuduClient); } } }
