Author: ctubbsii Date: Fri Jan 18 03:18:29 2013 New Revision: 1435013 URL: http://svn.apache.org/viewvc?rev=1435013&view=rev Log: ACCUMULO-955 Made BatchWriterConfig Writable, so it can be stored in a job's configuration in a human-readable way. Updated AccumuloOutputFormat to use it, and added unit tests for ACCUMULO-706 and ACCUMULO-955. Added an additional check for a reasonable minimum maxMemory value.
Added: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java?rev=1435013&r1=1435012&r2=1435013&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java Fri Jan 18 03:18:29 2013 @@ -16,24 +16,43 @@ */ package org.apache.accumulo.core.client; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; + /** * This object holds configuration settings used to instantiate a {@link BatchWriter} */ -public class BatchWriterConfig { - private long maxMemory = 50 * 1024 * 1024; - private long maxLatency = 120000; - private long timeout = Long.MAX_VALUE; - private int maxWriteThreads = 3; +public class BatchWriterConfig implements Writable { + + private static final Long DEFAULT_MAX_MEMORY = 50 * 1024 * 1024l; + private Long maxMemory = null; + + private static final Long DEFAULT_MAX_LATENCY = 2 * 60 * 1000l; + private Long maxLatency = null; + + private static final Long DEFAULT_TIMEOUT = Long.MAX_VALUE; + private Long timeout = null; + + private static final Integer DEFAULT_MAX_WRITE_THREADS = 3; + private Integer maxWriteThreads = null; /** * * @param maxMemory - * size in bytes of the maximum memory to batch before writing. Defaults to 50M. + * size in bytes of the maximum memory to batch before writing. Minimum 1K. Defaults to 50M. */ public BatchWriterConfig setMaxMemory(long maxMemory) { + if (maxMemory < 1024) + throw new IllegalArgumentException("Max memory is too low at " + maxMemory + ". Minimum 1K."); this.maxMemory = maxMemory; return this; } @@ -93,18 +112,76 @@ public class BatchWriterConfig { } public long getMaxMemory() { - return maxMemory; + return maxMemory != null ? maxMemory : DEFAULT_MAX_MEMORY; } public long getMaxLatency(TimeUnit timeUnit) { - return timeUnit.convert(maxLatency, TimeUnit.MILLISECONDS); + return timeUnit.convert(maxLatency != null ? maxLatency : DEFAULT_MAX_LATENCY, TimeUnit.MILLISECONDS); } public long getTimeout(TimeUnit timeUnit) { - return timeUnit.convert(timeout, TimeUnit.MILLISECONDS); + return timeUnit.convert(timeout != null ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); } public int getMaxWriteThreads() { - return maxWriteThreads; + return maxWriteThreads != null ? maxWriteThreads : DEFAULT_MAX_WRITE_THREADS; + } + + @Override + public void write(DataOutput out) throws IOException { + // write this out in a human-readable way + ArrayList<String> fields = new ArrayList<String>(); + if (maxMemory != null) + addField(fields, "maxMemory", maxMemory); + if (maxLatency != null) + addField(fields, "maxLatency", maxLatency); + if (maxWriteThreads != null) + addField(fields, "maxWriteThreads", maxWriteThreads); + if (timeout != null) + addField(fields, "timeout", timeout); + String output = StringUtils.join(",", fields); + + byte[] bytes = output.getBytes(Charset.forName("UTF-8")); + byte[] len = String.format("%6s#", Integer.toString(bytes.length, 36)).getBytes("UTF-8"); + if (len.length != 7) + throw new IllegalStateException("encoded length does not match expected value"); + out.write(len); + out.write(bytes); + } + + private void addField(List<String> fields, String name, Object value) { + String key = StringUtils.escapeString(name, '\\', new char[] {',', '='}); + String val = StringUtils.escapeString(String.valueOf(value), '\\', new char[] {',', '='}); + fields.add(key + '=' + val); + } + + @Override + public void readFields(DataInput in) throws IOException { + byte[] len = new byte[7]; + in.readFully(len); + String strLen = new String(len, Charset.forName("UTF-8")); + if (!strLen.endsWith("#")) + throw new IllegalStateException("length was not encoded correctly"); + byte[] bytes = new byte[Integer.parseInt(strLen.substring(strLen.lastIndexOf(' ') + 1, strLen.length() - 1), 36)]; + in.readFully(bytes); + + String strFields = new String(bytes, Charset.forName("UTF-8")); + String[] fields = StringUtils.split(strFields, '\\', ','); + for (String field : fields) { + String[] keyValue = StringUtils.split(field, '\\', '='); + String key = keyValue[0]; + String value = keyValue[1]; + if ("maxMemory".equals(key)) { + maxMemory = Long.valueOf(value); + } else if ("maxLatency".equals(key)) { + maxLatency = Long.valueOf(value); + } else if ("maxWriteThreads".equals(key)) { + maxWriteThreads = Integer.valueOf(value); + } else if ("timeout".equals(key)) { + timeout = Long.valueOf(value); + } else { + /* ignore any other properties */ + } + } } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1435013&r1=1435012&r2=1435013&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java Fri Jan 18 03:18:29 2013 @@ -16,7 +16,12 @@ */ package org.apache.accumulo.core.client.mapreduce; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.nio.charset.Charset; import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; @@ -141,7 +146,15 @@ public class AccumuloOutputFormat extend * @see #setMockInstance(Job, String) * @see #getInstance(JobContext) */ - private static final String MOCK = ".useMockInstance"; + private static final String MOCK = PREFIX + ".useMockInstance"; + + /** + * Key for storing the {@link BatchWriterConfig}. + * + * @see #setBatchWriterOptions(Job, BatchWriterConfig) + * @see #getBatchWriterOptions(JobContext) + */ + private static final String BATCH_WRITER_CONFIG = PREFIX + ".bwConfig"; /** * Key for storing the directive to create tables that don't exist @@ -232,35 +245,26 @@ public class AccumuloOutputFormat extend } /** + * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is + * used. Setting the configuration multiple times overwrites any previous configuration. + * + * @param job + * the Hadoop job instance to be configured + * @param bwConfig + * the configuration for the {@link BatchWriter} * @since 1.5.0 - * @see BatchWriterConfig#setMaxMemory(long) - */ - public static void setMaxMutationBufferSize(Job job, long numberOfBytes) { - job.getConfiguration().setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes); - } - - /** - * @since 1.5.0 - * @see BatchWriterConfig#setMaxLatency(long, TimeUnit) - */ - public static void setMaxLatency(Job job, int numberOfMilliseconds) { - job.getConfiguration().setInt(MAX_LATENCY, numberOfMilliseconds); - } - - /** - * @since 1.5.0 - * @see BatchWriterConfig#setMaxWriteThreads(int) - */ - public static void setMaxWriteThreads(Job job, int numberOfThreads) { - job.getConfiguration().setInt(NUM_WRITE_THREADS, numberOfThreads); - } - - /** - * @since 1.5.0 - * @see BatchWriterConfig#setTimeout(long, TimeUnit) */ - public static void setTimeout(Job job, long time, TimeUnit timeUnit) { - job.getConfiguration().setLong(TIMEOUT, timeUnit.toMillis(time)); + public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + String serialized; + try { + bwConfig.write(new DataOutputStream(baos)); + serialized = new String(baos.toByteArray(), Charset.forName("UTF-8")); + baos.close(); + } catch (IOException e) { + throw new IllegalArgumentException("unable to serialize " + BatchWriterConfig.class.getName()); + } + job.getConfiguration().set(BATCH_WRITER_CONFIG, serialized); } /** @@ -363,55 +367,29 @@ public class AccumuloOutputFormat extend } /** - * Gets the corresponding {@link BatchWriterConfig} setting. - * - * @param context - * the Hadoop context for the configured job - * @return the max memory to use (in bytes) - * @since 1.5.0 - * @see #setMaxMutationBufferSize(Job, long) - */ - protected static long getMaxMutationBufferSize(JobContext context) { - return context.getConfiguration().getLong(MAX_MUTATION_BUFFER_SIZE, new BatchWriterConfig().getMaxMemory()); - } - - /** - * Gets the corresponding {@link BatchWriterConfig} setting. - * - * @param context - * the Hadoop context for the configured job - * @return the max latency to use (in millis) - * @since 1.5.0 - * @see #setMaxLatency(Job, int) - */ - protected static long getMaxLatency(JobContext context) { - return context.getConfiguration().getLong(MAX_LATENCY, new BatchWriterConfig().getMaxLatency(TimeUnit.MILLISECONDS)); - } - - /** - * Gets the corresponding {@link BatchWriterConfig} setting. - * - * @param context - * the Hadoop context for the configured job - * @return the max write threads to use - * @since 1.5.0 - * @see #setMaxWriteThreads(Job, int) - */ - protected static int getMaxWriteThreads(JobContext context) { - return context.getConfiguration().getInt(NUM_WRITE_THREADS, new BatchWriterConfig().getMaxWriteThreads()); - } - - /** - * Gets the corresponding {@link BatchWriterConfig} setting. + * Gets the {@link BatchWriterConfig} settings. * * @param context * the Hadoop context for the configured job - * @return the timeout for write operations + * @return the configuration object * @since 1.5.0 - * @see #setTimeout(Job, long, TimeUnit) + * @see #setBatchWriterOptions(Job, BatchWriterConfig) */ - protected static long getTimeout(JobContext context) { - return context.getConfiguration().getLong(TIMEOUT, Long.MAX_VALUE); + protected static BatchWriterConfig getBatchWriterOptions(JobContext context) { + String serialized = context.getConfiguration().get(BATCH_WRITER_CONFIG); + BatchWriterConfig bwConfig = new BatchWriterConfig(); + if (serialized == null || serialized.isEmpty()) { + return bwConfig; + } else { + try { + ByteArrayInputStream bais = new ByteArrayInputStream(serialized.getBytes(Charset.forName("UTF-8"))); + bwConfig.readFields(new DataInputStream(bais)); + bais.close(); + return bwConfig; + } catch (IOException e) { + throw new IllegalArgumentException("unable to serialize " + BatchWriterConfig.class.getName()); + } + } } /** @@ -475,9 +453,7 @@ public class AccumuloOutputFormat extend if (!simulate) { this.conn = getInstance(context).getConnector(getUsername(context), getPassword(context)); - mtbw = conn.createMultiTableBatchWriter(new BatchWriterConfig().setMaxMemory(getMaxMutationBufferSize(context)) - .setMaxLatency(getMaxLatency(context), TimeUnit.MILLISECONDS).setMaxWriteThreads(getMaxWriteThreads(context)) - .setTimeout(getTimeout(context), TimeUnit.MILLISECONDS)); + mtbw = conn.createMultiTableBatchWriter(getBatchWriterOptions(context)); } } @@ -658,12 +634,6 @@ public class AccumuloOutputFormat extend private static final String NUM_WRITE_THREADS = PREFIX + ".writethreads"; /** - * @deprecated since 1.5.0; - */ - @Deprecated - private static final String TIMEOUT = PREFIX + ".timeout"; - - /** * @deprecated since 1.5.0; Use {@link #setOutputInfo(Job, String, byte[], boolean, String)} instead. */ @Deprecated @@ -705,7 +675,7 @@ public class AccumuloOutputFormat extend } /** - * @deprecated since 1.5.0; Use {@link #setMaxMutationBufferSize(Job, long)} instead. + * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, BatchWriterConfig)} instead. */ @Deprecated public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) { @@ -713,7 +683,7 @@ public class AccumuloOutputFormat extend } /** - * @deprecated since 1.5.0; Use {@link #setMaxLatency(Job, int)} instead. + * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, BatchWriterConfig)} instead. */ @Deprecated public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) { @@ -721,7 +691,7 @@ public class AccumuloOutputFormat extend } /** - * @deprecated since 1.5.0; Use {@link #setMaxWriteThreads(Job, int)} instead. + * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, BatchWriterConfig)} instead. */ @Deprecated public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) { @@ -788,7 +758,7 @@ public class AccumuloOutputFormat extend } /** - * @deprecated since 1.5.0; Use {@link #getMaxMutationBufferSize(JobContext)} instead. + * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} instead. */ @Deprecated protected static long getMaxMutationBufferSize(Configuration conf) { @@ -796,7 +766,7 @@ public class AccumuloOutputFormat extend } /** - * @deprecated since 1.5.0; Use {@link #getMaxLatency(JobContext)} instead. + * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} instead. */ @Deprecated protected static int getMaxLatency(Configuration conf) { @@ -806,7 +776,7 @@ public class AccumuloOutputFormat extend } /** - * @deprecated since 1.5.0; Use {@link #getMaxWriteThreads(JobContext)} instead. + * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} instead. */ @Deprecated protected static int getMaxWriteThreads(Configuration conf) { Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java?rev=1435013&r1=1435012&r2=1435013&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java Fri Jan 18 03:18:29 2013 @@ -47,8 +47,9 @@ public class DeleteCommand extends Comma return Long.MAX_VALUE; } - public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, - IOException, ConstraintViolationException { + @Override + public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException, IOException, ConstraintViolationException { shellState.checkTableState(); final Mutation m = new Mutation(new Text(cl.getArgs()[0].getBytes(Shell.CHARSET))); @@ -68,7 +69,7 @@ public class DeleteCommand extends Comma m.putDelete(colf, colq); } final BatchWriter bw = shellState.getConnector().createBatchWriter(shellState.getTableName(), - new BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS)); + new BatchWriterConfig().setMaxMemory(Math.max(m.estimatedMemoryUsed(), 1024)).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS)); bw.addMutation(m); bw.close(); return 0; @@ -100,7 +101,7 @@ public class DeleteCommand extends Comma "time before insert should fail if no data is written. If no unit is given assumes seconds. Units d,h,m,s,and ms are supported. e.g. 30s or 100ms"); timeoutOption.setArgName("timeout"); o.addOption(timeoutOption); - + return o; } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java?rev=1435013&r1=1435012&r2=1435013&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java Fri Jan 18 03:18:29 2013 @@ -54,9 +54,10 @@ public class InsertCommand extends Comma return Long.MAX_VALUE; } - - public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, - IOException, ConstraintViolationException { + + @Override + public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException, IOException, ConstraintViolationException { shellState.checkTableState(); final Mutation m = new Mutation(new Text(cl.getArgs()[0].getBytes(Shell.CHARSET))); @@ -78,7 +79,7 @@ public class InsertCommand extends Comma m.put(colf, colq, val); final BatchWriter bw = shellState.getConnector().createBatchWriter(shellState.getTableName(), - new BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS)); + new BatchWriterConfig().setMaxMemory(Math.max(m.estimatedMemoryUsed(), 1024)).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS)); bw.addMutation(m); try { bw.close(); @@ -103,7 +104,7 @@ public class InsertCommand extends Comma if (e.getCause() != null) lines.add(" Caused by : " + e.getCause().getClass().getName() + " : " + e.getCause().getMessage()); } - + shellState.printLines(lines.iterator(), false); } return 0; @@ -134,7 +135,7 @@ public class InsertCommand extends Comma "time before insert should fail if no data is written. If no unit is given assumes seconds. Units d,h,m,s,and ms are supported. e.g. 30s or 100ms"); timeoutOption.setArgName("timeout"); o.addOption(timeoutOption); - + return o; } Added: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java?rev=1435013&view=auto ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java (added) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java Fri Jan 18 03:18:29 2013 @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +/** + * + */ +public class BatchWriterConfigTest { + + @Test + public void testReasonableDefaults() { + long expectedMaxMemory = 50 * 1024 * 1024l; + long expectedMaxLatency = 120000l; + long expectedTimeout = Long.MAX_VALUE; + int expectedMaxWriteThreads = 3; + + BatchWriterConfig defaults = new BatchWriterConfig(); + assertEquals(expectedMaxMemory, defaults.getMaxMemory()); + assertEquals(expectedMaxLatency, defaults.getMaxLatency(TimeUnit.MILLISECONDS)); + assertEquals(expectedTimeout, defaults.getTimeout(TimeUnit.MILLISECONDS)); + assertEquals(expectedMaxWriteThreads, defaults.getMaxWriteThreads()); + } + + @Test + public void testOverridingDefaults() { + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxMemory(1123581321l); + bwConfig.setMaxLatency(22, TimeUnit.HOURS); + bwConfig.setTimeout(33, TimeUnit.DAYS); + bwConfig.setMaxWriteThreads(42); + + assertEquals(1123581321l, bwConfig.getMaxMemory()); + assertEquals(22 * 60 * 60 * 1000l, bwConfig.getMaxLatency(TimeUnit.MILLISECONDS)); + assertEquals(33 * 24 * 60 * 60 * 1000l, bwConfig.getTimeout(TimeUnit.MILLISECONDS)); + assertEquals(42, bwConfig.getMaxWriteThreads()); + } + + @Test + public void testZeroTimeoutAndLatency() { + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxLatency(0, TimeUnit.MILLISECONDS); + bwConfig.setTimeout(0, TimeUnit.MILLISECONDS); + + assertEquals(Long.MAX_VALUE, bwConfig.getMaxLatency(TimeUnit.MILLISECONDS)); + assertEquals(Long.MAX_VALUE, bwConfig.getTimeout(TimeUnit.MILLISECONDS)); + } + + @Test(expected = IllegalArgumentException.class) + public void testMaxMemoryTooLow() { + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxMemory(1024 - 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeMaxLatency() { + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxLatency(-1, TimeUnit.DAYS); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeTimeout() { + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setTimeout(-1, TimeUnit.DAYS); + } + + @Test(expected = IllegalArgumentException.class) + public void testZeroMaxWriteThreads() { + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxWriteThreads(0); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeMaxWriteThreads() { + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxWriteThreads(-1); + } + + @Test + public void testSerialize() throws IOException { + // make sure we aren't testing defaults + final BatchWriterConfig bwDefaults = new BatchWriterConfig(); + assertNotEquals(7654321l, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS)); + assertNotEquals(9898989l, bwDefaults.getTimeout(TimeUnit.MILLISECONDS)); + assertNotEquals(42, bwDefaults.getMaxWriteThreads()); + assertNotEquals(1123581321l, bwDefaults.getMaxMemory()); + + // test setting all fields + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxLatency(7654321l, TimeUnit.MILLISECONDS); + bwConfig.setTimeout(9898989l, TimeUnit.MILLISECONDS); + bwConfig.setMaxWriteThreads(42); + bwConfig.setMaxMemory(1123581321l); + byte[] bytes = createBytes(bwConfig); + checkBytes(bwConfig, bytes); + + // test human-readable serialization + bwConfig = new BatchWriterConfig(); + bwConfig.setMaxWriteThreads(42); + bytes = createBytes(bwConfig); + assertEquals(" i#maxWriteThreads=42", new String(bytes, Charset.forName("UTF-8"))); + checkBytes(bwConfig, bytes); + + // test human-readable with 2 fields + bwConfig = new BatchWriterConfig(); + bwConfig.setMaxWriteThreads(24); + bwConfig.setTimeout(3, TimeUnit.SECONDS); + bytes = createBytes(bwConfig); + assertEquals(" v#maxWriteThreads=24,timeout=3000", new String(bytes, Charset.forName("UTF-8"))); + checkBytes(bwConfig, bytes); + } + + private byte[] createBytes(BatchWriterConfig bwConfig) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + bwConfig.write(new DataOutputStream(baos)); + return baos.toByteArray(); + } + + private void checkBytes(BatchWriterConfig bwConfig, byte[] bytes) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + BatchWriterConfig createdConfig = new BatchWriterConfig(); + createdConfig.readFields(new DataInputStream(bais)); + + assertEquals(bwConfig.getMaxMemory(), createdConfig.getMaxMemory()); + assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS), createdConfig.getMaxLatency(TimeUnit.MILLISECONDS)); + assertEquals(bwConfig.getTimeout(TimeUnit.MILLISECONDS), createdConfig.getTimeout(TimeUnit.MILLISECONDS)); + assertEquals(bwConfig.getMaxWriteThreads(), createdConfig.getMaxWriteThreads()); + } + +} Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java?rev=1435013&r1=1435012&r2=1435013&view=diff ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java (original) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java Fri Jan 18 03:18:29 2013 @@ -18,12 +18,14 @@ package org.apache.accumulo.core.client. import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Iterator; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.BatchWriter; @@ -39,6 +41,7 @@ import org.apache.accumulo.core.util.Cac import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -120,6 +123,46 @@ public class AccumuloOutputFormatTest { } @Test + public void testBWSettings() throws IOException { + Job job = new Job(); + + // make sure we aren't testing defaults + final BatchWriterConfig bwDefaults = new BatchWriterConfig(); + assertNotEquals(7654321l, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS)); + assertNotEquals(9898989l, bwDefaults.getTimeout(TimeUnit.MILLISECONDS)); + assertNotEquals(42, bwDefaults.getMaxWriteThreads()); + assertNotEquals(1123581321l, bwDefaults.getMaxMemory()); + + final BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxLatency(7654321l, TimeUnit.MILLISECONDS); + bwConfig.setTimeout(9898989l, TimeUnit.MILLISECONDS); + bwConfig.setMaxWriteThreads(42); + bwConfig.setMaxMemory(1123581321l); + AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig); + + AccumuloOutputFormat myAOF = new AccumuloOutputFormat() { + @Override + public void checkOutputSpecs(JobContext job) throws IOException { + BatchWriterConfig bwOpts = getBatchWriterOptions(job); + + // passive check + assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS), bwOpts.getMaxLatency(TimeUnit.MILLISECONDS)); + assertEquals(bwConfig.getTimeout(TimeUnit.MILLISECONDS), bwOpts.getTimeout(TimeUnit.MILLISECONDS)); + assertEquals(bwConfig.getMaxWriteThreads(), bwOpts.getMaxWriteThreads()); + assertEquals(bwConfig.getMaxMemory(), bwOpts.getMaxMemory()); + + // explicit check + assertEquals(7654321l, bwOpts.getMaxLatency(TimeUnit.MILLISECONDS)); + assertEquals(9898989l, bwOpts.getTimeout(TimeUnit.MILLISECONDS)); + assertEquals(42, bwOpts.getMaxWriteThreads()); + assertEquals(1123581321l, bwOpts.getMaxMemory()); + + } + }; + myAOF.checkOutputSpecs(job); + } + + @Test public void testMR() throws Exception { MockInstance mockInstance = new MockInstance("testmrinstance"); Connector c = mockInstance.getConnector("root", new byte[] {}); Modified: accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java?rev=1435013&r1=1435012&r2=1435013&view=diff ============================================================================== --- accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java (original) +++ accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java Fri Jan 18 03:18:29 2013 @@ -26,6 +26,7 @@ import java.util.List; import java.util.Random; import org.apache.accumulo.core.cli.ClientOnRequiredTable; +import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; @@ -383,7 +384,8 @@ public class TeraSortIngest extends Conf job.setOutputFormatClass(AccumuloOutputFormat.class); opts.setAccumuloConfigs(job); - AccumuloOutputFormat.setMaxMutationBufferSize(job, 10L * 1000 * 1000); + BatchWriterConfig bwConfig = new BatchWriterConfig().setMaxMemory(10L * 1000 * 1000); + AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig); Configuration conf = job.getConfiguration(); conf.setLong(NUMROWS, opts.numRows); Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java?rev=1435013&r1=1435012&r2=1435013&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java Fri Jan 18 03:18:29 2013 @@ -149,9 +149,7 @@ public class ContinuousMoru extends Conf job.setNumReduceTasks(0); job.setOutputFormatClass(AccumuloOutputFormat.class); - AccumuloOutputFormat.setMaxLatency(job, Integer.parseInt(String.valueOf(bwOpts.batchLatency))); - AccumuloOutputFormat.setMaxMutationBufferSize(job, bwOpts.batchMemory); - AccumuloOutputFormat.setMaxWriteThreads(job, bwOpts.batchThreads); + AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig()); Configuration conf = job.getConfiguration(); conf.setLong(MIN, opts.min);