Repository: kudu Updated Branches: refs/heads/master 2e462afc8 -> 206a3f1f5
ITBLL: clean up exception handling Change-Id: I8fcd624e709d0f3a931055c1b4b38aab2d5b2e37 Reviewed-on: http://gerrit.cloudera.org:8080/6407 Reviewed-by: Jean-Daniel Cryans <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/206a3f1f Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/206a3f1f Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/206a3f1f Branch: refs/heads/master Commit: 206a3f1f567cca961bb71c3061883c0484e64d3e Parents: 2e462af Author: Dan Burkert <[email protected]> Authored: Wed Mar 15 12:26:55 2017 -0700 Committer: Jean-Daniel Cryans <[email protected]> Committed: Thu Mar 16 19:57:10 2017 +0000 ---------------------------------------------------------------------- .../tools/IntegrationTestBigLinkedList.java | 188 +++++++------------ 1 file changed, 67 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/206a3f1f/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java ---------------------------------------------------------------------- diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java index 6171026..0eb1543 100644 --- a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java +++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -34,8 +33,6 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -63,6 +60,8 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; @@ -73,6 +72,7 @@ import org.apache.kudu.client.AbstractKuduScannerBuilder; import org.apache.kudu.client.Bytes; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; import org.apache.kudu.client.KuduScanner; import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduTable; @@ -89,7 +89,7 @@ import org.apache.kudu.util.Pair; /** * <p> * This is an integration test borrowed from goraci, written by Keith Turner, - * which is in turn inspired by the Accumulo test called continous ingest (ci). + * which is in turn inspired by the Accumulo test called continuous ingest (ci). * The original source code can be found here: * </p> * <ul> @@ -199,7 +199,7 @@ import org.apache.kudu.util.Pair; * Delete - Disabled. A standalone program that deletes a single node * </li> * <li> - * Walker - Disabled. A standalong program that start following a linked list and emits timing + * Walker - Disabled. A standalone program that start following a linked list and emits timing * info. * </li> * </ul> @@ -219,7 +219,7 @@ import org.apache.kudu.util.Pair; * spread over the Long.MIN_VALUE - Long.MAX_VALUE keyspace. * </li> * <li> - * The Walker and Deleter progams were disabled to save some time but they can be re-enabled then + * The Walker and Deleter programs were disabled to save some time but they can be re-enabled then * ported to Kudu without too much effort. * </li> * </ul> @@ -370,7 +370,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { */ static class Generator extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(Generator.class); + private static final Logger LOG = LoggerFactory.getLogger(Generator.class); private CommandLineParser parser; private KuduClient client; @@ -435,7 +435,6 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { public boolean nextKeyValue() throws IOException, InterruptedException { return count++ < numNodes; } - } @Override @@ -510,17 +509,13 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { private int width; @Override - protected void setup(Context context) throws IOException, InterruptedException { + protected void setup(Context context) throws KuduException { id = "Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID(); Configuration conf = context.getConfiguration(); CommandLineParser parser = new CommandLineParser(conf); client = parser.getClient(); - try { - table = client.openTable(getTableName(conf)); - headsTable = client.openTable(getHeadsTable(conf)); - } catch (Exception e) { - throw new IOException(e); - } + table = client.openTable(getTableName(conf)); + headsTable = client.openTable(getHeadsTable(conf)); session = client.newSession(); session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); session.setMutationBufferSpace(WIDTH_DEFAULT); @@ -538,14 +533,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { + protected void cleanup(Context context) throws KuduException { session.close(); client.shutdown(); - } catch (Exception ex) { - // ugh. - throw new IOException(ex); - } } @Override @@ -578,12 +568,8 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { PartialRow row = insert.getRow(); row.addLong(COLUMN_KEY_ONE, Bytes.getLong(first[0])); row.addLong(COLUMN_KEY_TWO, Bytes.getLong(first[0], 8)); - try { - session.apply(insert); - session.flush(); - } catch (Exception e) { - throw new IOException("Couldn't flush the head row, " + insert, e); - } + session.apply(insert); + session.flush(); first = null; prev = null; @@ -599,46 +585,41 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { first[first.length - 1] = ez; } - private void persist(Context output, byte[][] data, boolean update) - throws IOException { - try { - for (int i = 0; i < data.length; i++) { - Operation put = update ? table.newUpdate() : table.newInsert(); - PartialRow row = put.getRow(); - - long keyOne = Bytes.getLong(data[i]); - long keyTwo = Bytes.getLong(data[i], 8); - - row.addLong(COLUMN_KEY_ONE, keyOne); - row.addLong(COLUMN_KEY_TWO, keyTwo); - - // prev is null for the first line, we'll update it at the end. - if (prev == null) { - row.setNull(COLUMN_PREV_ONE); - row.setNull(COLUMN_PREV_TWO); - } else { - row.addLong(COLUMN_PREV_ONE, Bytes.getLong(prev[i])); - row.addLong(COLUMN_PREV_TWO, Bytes.getLong(prev[i], 8)); - } + private void persist(Context output, byte[][] data, boolean update) throws KuduException { + for (int i = 0; i < data.length; i++) { + Operation put = update ? table.newUpdate() : table.newInsert(); + PartialRow row = put.getRow(); - if (!update) { - // We only add those for new inserts, we don't update the heads with a new row, etc. - row.addLong(COLUMN_ROW_ID, rowId + i); - row.addString(COLUMN_CLIENT, id); - row.addInt(COLUMN_UPDATE_COUNT, 0); - } - session.apply(put); + long keyOne = Bytes.getLong(data[i]); + long keyTwo = Bytes.getLong(data[i], 8); - if (i % 1000 == 0) { - // Tickle progress every so often else maprunner will think us hung - output.progress(); - } + row.addLong(COLUMN_KEY_ONE, keyOne); + row.addLong(COLUMN_KEY_TWO, keyTwo); + + // prev is null for the first line, we'll update it at the end. + if (prev == null) { + row.setNull(COLUMN_PREV_ONE); + row.setNull(COLUMN_PREV_TWO); + } else { + row.addLong(COLUMN_PREV_ONE, Bytes.getLong(prev[i])); + row.addLong(COLUMN_PREV_TWO, Bytes.getLong(prev[i], 8)); } - session.flush(); - } catch (Exception ex) { - throw new IOException(ex); + if (!update) { + // We only add those for new inserts, we don't update the heads with a new row, etc. + row.addLong(COLUMN_ROW_ID, rowId + i); + row.addString(COLUMN_CLIENT, id); + row.addInt(COLUMN_UPDATE_COUNT, 0); + } + session.apply(put); + + if (i % 1000 == 0) { + // Tickle progress every so often else maprunner will think us hung + output.progress(); + } } + + session.flush(); } } @@ -702,9 +683,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { BigInteger max = BigInteger.valueOf(Long.MAX_VALUE); BigInteger step = max.multiply(BigInteger.valueOf(2)) .divide(BigInteger.valueOf(numTablets)); - LOG.info(min.longValue()); - LOG.info(max.longValue()); - LOG.info(step.longValue()); + LOG.info("min: {}, max: {}, step: {}", min, max, step); PartialRow splitRow = schema.newPartialRow(); splitRow.addLong("key2", Long.MIN_VALUE); for (int i = 1; i < numTablets; i++) { @@ -790,7 +769,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { */ static class Verify extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(Verify.class); + private static final Logger LOG = LoggerFactory.getLogger(Verify.class); private static final BytesWritable DEF = new BytesWritable(NO_KEY); private static final Joiner COMMA_JOINER = Joiner.on(","); private static final byte[] rowKey = new byte[ROWKEY_LENGTH]; @@ -831,8 +810,6 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { public static class VerifyReducer extends Reducer<BytesWritable,BytesWritable,Text,Text> { private ArrayList<byte[]> refs = new ArrayList<byte[]>(); - private AtomicInteger rows = new AtomicInteger(0); - @Override public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { @@ -1002,7 +979,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { */ static class Loop extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(Loop.class); + private static final Logger LOG = LoggerFactory.getLogger(Loop.class); IntegrationTestBigLinkedList it; @@ -1190,7 +1167,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { */ private static class Updater extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(Updater.class); + private static final Logger LOG = LoggerFactory.getLogger(Updater.class); private static final String MAX_LINK_UPDATES_PER_MAPPER = "kudu.updates.per.mapper"; @@ -1218,10 +1195,10 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { * Schema we use when getting rows from the linked list, we only need the reference and * its update count. */ - private static final List<String> SCAN_COLUMN_NAMES = ImmutableList.of( - COLUMN_PREV_ONE, COLUMN_PREV_TWO, COLUMN_UPDATE_COUNT, COLUMN_CLIENT); + private static final List<String> SCAN_COLUMN_NAMES = + ImmutableList.of(COLUMN_PREV_ONE, COLUMN_PREV_TWO, COLUMN_UPDATE_COUNT, COLUMN_CLIENT); - private long numUpdatesPerMapper; + private int numUpdatesPerMapper; /** * Processing each linked list takes minutes, meaning that it's easily possible for our @@ -1231,22 +1208,14 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { private List<Pair<Long, Long>> headsCache; @Override - protected void setup(Context context) throws IOException, InterruptedException { + protected void setup(Context context) throws KuduException { Configuration conf = context.getConfiguration(); CommandLineParser parser = new CommandLineParser(conf); client = parser.getClient(); - try { - table = client.openTable(getTableName(conf)); - } catch (Exception e) { - throw new IOException("Couldn't open the linked list table", e); - } + table = client.openTable(getTableName(conf)); session = client.newSession(); - - Schema tableSchema = table.getSchema(); - - - numUpdatesPerMapper = conf.getLong(MAX_LINK_UPDATES_PER_MAPPER, 1); - headsCache = new ArrayList<Pair<Long, Long>>((int)numUpdatesPerMapper); + numUpdatesPerMapper = conf.getInt(MAX_LINK_UPDATES_PER_MAPPER, 1); + headsCache = new ArrayList<>(numUpdatesPerMapper); } @Override @@ -1340,19 +1309,11 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { /** * Finds the next node in the linked list. */ - private RowResult nextNode(long prevKeyOne, long prevKeyTwo) throws IOException { + private RowResult nextNode(long prevKeyOne, long prevKeyTwo) throws KuduException { KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table) - .setProjectedColumnNames(SCAN_COLUMN_NAMES); - + .setProjectedColumnNames(SCAN_COLUMN_NAMES); configureScannerForRandomRead(builder, table, prevKeyOne, prevKeyTwo); - - try { - return getOneRowResult(builder.build()); - } catch (Exception e) { - // Goes right out and fails the job. - throw new IOException("Couldn't read the following row: " + - getStringFromKeys(prevKeyOne, prevKeyTwo), e); - } + return getOneRowResult(builder.build()); } private void updateRow(long keyOne, long keyTwo, int newCount) throws IOException { @@ -1361,13 +1322,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { row.addLong(COLUMN_KEY_ONE, keyOne); row.addLong(COLUMN_KEY_TWO, keyTwo); row.addInt(COLUMN_UPDATE_COUNT, newCount); - try { - session.apply(update); - } catch (Exception e) { - // Goes right out and fails the job. - throw new IOException("Couldn't update the following row: " + - getStringFromKeys(keyOne, keyTwo), e); - } + session.apply(update); } /** @@ -1399,14 +1354,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { - session.close(); - client.shutdown(); - } catch (Exception ex) { - // Goes right out and fails the job. - throw new IOException("Coulnd't close the scanner after the task completed", ex); - } + protected void cleanup(Context context) throws KuduException { + session.close(); + client.shutdown(); } } @@ -1554,15 +1504,11 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { System.out.println("Walking with " + getStringFromKeys(keyOne, keyTwo)); - try { - walk(keyOne, keyTwo, maxNumNodes); - } catch (Exception e) { - throw new IOException(e); - } + walk(keyOne, keyTwo, maxNumNodes); return 0; } - private void walk(long headKeyOne, long headKeyTwo, int maxNumNodes) throws Exception { + private void walk(long headKeyOne, long headKeyTwo, int maxNumNodes) throws KuduException { CommandLineParser parser = new CommandLineParser(getConf()); client = parser.getClient(); table = client.openTable(getTableName(getConf())); @@ -1591,7 +1537,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { maxNumNodes)); } - private RowResult nextNode(long keyOne, long keyTwo) throws Exception { + private RowResult nextNode(long keyOne, long keyTwo) throws KuduException { KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table); configureScannerForRandomRead(builder, table, keyOne, keyTwo); @@ -1646,14 +1592,14 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { return new StringBuilder().append(key1).append(",").append(key2).toString(); } - private static RowResult getOneRowResult(KuduScanner scanner) throws Exception { + private static RowResult getOneRowResult(KuduScanner scanner) throws KuduException { RowResultIterator rowResults; rowResults = scanner.nextRows(); if (rowResults.getNumRows() == 0) { return null; } if (rowResults.getNumRows() > 1) { - throw new Exception("Received too many rows from scanner " + scanner); + throw new RuntimeException("Received too many rows from scanner " + scanner); } return rowResults.next(); } @@ -1693,7 +1639,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - Tool tool = null; + Tool tool; processOptions(args); if (toRun.equals("Generator")) { tool = new Generator();
