This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new ed505d0 Create types in ClientProperty. Closes #778 (#779) ed505d0 is described below commit ed505d076071c792ca97441d17ae2ebd00c34a17 Author: Mike Miller <mmil...@apache.org> AuthorDate: Wed Nov 28 15:35:57 2018 -0500 Create types in ClientProperty. Closes #778 (#779) * Fixes precision with BW config. Closes #778 * Added property types to ClientProperty * Renamed batch writer props for consistency with server properties * Add default to durability PropertyType bounds * Use separate getter and setter methods for different types --- .../accumulo/core/client/BatchWriterConfig.java | 23 ++-- .../core/clientImpl/AccumuloClientImpl.java | 15 ++- .../accumulo/core/clientImpl/ClientContext.java | 6 +- .../apache/accumulo/core/conf/ClientProperty.java | 131 +++++++++++++-------- .../apache/accumulo/core/conf/PropertyType.java | 2 +- .../core/clientImpl/ClientConfConverterTest.java | 2 +- .../accumulo/core/conf/ClientPropertyTest.java | 28 +++++ .../mapreduce/lib/OutputConfigurator.java | 32 ++--- 8 files changed, 152 insertions(+), 87 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java b/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java index 1508935..a260921 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java +++ b/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java @@ -17,10 +17,10 @@ package org.apache.accumulo.core.client; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC; -import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES; -import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC; -import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS; +import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_LATENCY_MAX; +import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MEMORY_MAX; +import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_THREADS_MAX; +import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_TIMEOUT_MAX; import java.io.DataInput; import java.io.DataOutput; @@ -30,6 +30,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.clientImpl.DurabilityImpl; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; @@ -41,30 +42,30 @@ import org.apache.hadoop.util.StringUtils; */ public class BatchWriterConfig implements Writable { - private static final Long DEFAULT_MAX_MEMORY = Long - .parseLong(BATCH_WRITER_MAX_MEMORY_BYTES.getDefaultValue()); + private static final Long DEFAULT_MAX_MEMORY = ConfigurationTypeHelper + .getMemoryAsBytes(BATCH_WRITER_MEMORY_MAX.getDefaultValue()); private Long maxMemory = null; - private static final Long DEFAULT_MAX_LATENCY = TimeUnit.MILLISECONDS - .convert(Long.parseLong(BATCH_WRITER_MAX_LATENCY_SEC.getDefaultValue()), TimeUnit.SECONDS); + private static final Long DEFAULT_MAX_LATENCY = ConfigurationTypeHelper + .getTimeInMillis(BATCH_WRITER_LATENCY_MAX.getDefaultValue()); private Long maxLatency = null; private static final Long DEFAULT_TIMEOUT = getDefaultTimeout(); private Long timeout = null; private static final Integer DEFAULT_MAX_WRITE_THREADS = Integer - .parseInt(BATCH_WRITER_MAX_WRITE_THREADS.getDefaultValue()); + .parseInt(BATCH_WRITER_THREADS_MAX.getDefaultValue()); private Integer maxWriteThreads = null; private Durability durability = Durability.DEFAULT; private boolean isDurabilitySet = false; private static Long getDefaultTimeout() { - Long def = Long.parseLong(BATCH_WRITER_MAX_TIMEOUT_SEC.getDefaultValue()); + Long def = ConfigurationTypeHelper.getTimeInMillis(BATCH_WRITER_TIMEOUT_MAX.getDefaultValue()); if (def.equals(0L)) return Long.MAX_VALUE; else - return TimeUnit.MILLISECONDS.convert(def, TimeUnit.SECONDS); + return def; } /** diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloClientImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloClientImpl.java index 83e9250..6cac797 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloClientImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloClientImpl.java @@ -356,7 +356,7 @@ public class AccumuloClientImpl implements AccumuloClient { @Override public ConnectionOptions zkTimeout(int timeout) { - setProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, Integer.toString(timeout) + "ms"); + ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.setTimeInMillis(properties, (long) timeout); return this; } @@ -374,13 +374,12 @@ public class AccumuloClientImpl implements AccumuloClient { @Override public ConnectionOptions batchWriterConfig(BatchWriterConfig batchWriterConfig) { - setProperty(ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES, batchWriterConfig.getMaxMemory()); - setProperty(ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC, - batchWriterConfig.getMaxLatency(TimeUnit.SECONDS)); - setProperty(ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC, - batchWriterConfig.getTimeout(TimeUnit.SECONDS)); - setProperty(ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS, - batchWriterConfig.getMaxWriteThreads()); + ClientProperty.BATCH_WRITER_MEMORY_MAX.setBytes(properties, batchWriterConfig.getMaxMemory()); + ClientProperty.BATCH_WRITER_LATENCY_MAX.setTimeInMillis(properties, + batchWriterConfig.getMaxLatency(TimeUnit.MILLISECONDS)); + ClientProperty.BATCH_WRITER_TIMEOUT_MAX.setTimeInMillis(properties, + batchWriterConfig.getTimeout(TimeUnit.MILLISECONDS)); + setProperty(ClientProperty.BATCH_WRITER_THREADS_MAX, batchWriterConfig.getMaxWriteThreads()); setProperty(ClientProperty.BATCH_WRITER_DURABILITY, batchWriterConfig.getDurability().toString()); return this; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 28395a5..e41e81e 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -247,15 +247,15 @@ public class ClientContext { if (batchWriterConfig == null) { Properties props = info.getProperties(); batchWriterConfig = new BatchWriterConfig(); - Long maxMemory = ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES.getLong(props); + Long maxMemory = ClientProperty.BATCH_WRITER_MEMORY_MAX.getBytes(props); if (maxMemory != null) { batchWriterConfig.setMaxMemory(maxMemory); } - Long maxLatency = ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC.getLong(props); + Long maxLatency = ClientProperty.BATCH_WRITER_LATENCY_MAX.getTimeInMillis(props); if (maxLatency != null) { batchWriterConfig.setMaxLatency(maxLatency, TimeUnit.SECONDS); } - Long timeout = ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC.getLong(props); + Long timeout = ClientProperty.BATCH_WRITER_TIMEOUT_MAX.getTimeInMillis(props); if (timeout != null) { batchWriterConfig.setTimeout(timeout, TimeUnit.SECONDS); } diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java index a1ef2c5..219906a 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java @@ -16,6 +16,8 @@ */ package org.apache.accumulo.core.conf; +import static com.google.common.base.Preconditions.checkState; + import java.io.File; import java.io.IOException; import java.util.Base64; @@ -37,55 +39,64 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public enum ClientProperty { // Instance - INSTANCE_NAME("instance.name", "", "Name of Accumulo instance to connect to", "", true), - INSTANCE_ZOOKEEPERS("instance.zookeepers", "localhost:2181", - "Zookeeper connection information for Accumulo instance", "", true), - INSTANCE_ZOOKEEPERS_TIMEOUT("instance.zookeepers.timeout", "30s", "Zookeeper session timeout"), + INSTANCE_NAME("instance.name", "", PropertyType.STRING, + "Name of Accumulo instance to " + "connect to", "2.0.0", true), + INSTANCE_ZOOKEEPERS("instance.zookeepers", "localhost:2181", PropertyType.HOSTLIST, + "Zookeeper connection information for Accumulo instance", "2.0.0", true), + INSTANCE_ZOOKEEPERS_TIMEOUT("instance.zookeepers.timeout", "30s", PropertyType.TIMEDURATION, + "Zookeeper session timeout", "2.0.0", false), // Authentication - AUTH_TYPE("auth.type", "password", - "Authentication method (i.e password, kerberos, PasswordToken, KerberosToken, etc)", "", + AUTH_TYPE("auth.type", "password", PropertyType.STRING, + "Authentication method (i.e password, kerberos, PasswordToken, KerberosToken, etc)", "2.0.0", true), - AUTH_PRINCIPAL("auth.principal", "", - "Accumulo principal/username for chosen authentication method", "", true), - AUTH_TOKEN("auth.token", "", "Authentication token (ex. mypassword, /path/to/keytab)", "", true), + AUTH_PRINCIPAL("auth.principal", "", PropertyType.STRING, + "Accumulo principal/username for chosen authentication method", "2.0.0", true), + AUTH_TOKEN("auth.token", "", PropertyType.STRING, + "Authentication token (ex. mypassword, /path/to/keytab)", "2.0.0", true), // BatchWriter - BATCH_WRITER_MAX_MEMORY_BYTES("batch.writer.max.memory.bytes", "52428800", - "Max memory (in bytes) to batch before writing"), - BATCH_WRITER_MAX_LATENCY_SEC("batch.writer.max.latency.sec", "120", - "Max amount of time (in seconds) to hold data in memory before flushing it"), - BATCH_WRITER_MAX_TIMEOUT_SEC("batch.writer.max.timeout.sec", "0", + BATCH_WRITER_MEMORY_MAX("batch.writer.memory.max", "50M", PropertyType.BYTES, + "Max memory (in bytes) to batch before writing", "2.0.0", false), + BATCH_WRITER_LATENCY_MAX("batch.writer.latency.max", "120s", PropertyType.TIMEDURATION, + "Max amount of time (in seconds) to hold data in memory before flushing it", "2.0.0", false), + BATCH_WRITER_TIMEOUT_MAX("batch.writer.timeout.max", "0", PropertyType.TIMEDURATION, "Max amount of time (in seconds) an unresponsive server will be re-tried. An" - + " exception is thrown when this timeout is exceeded. Set to zero for no timeout."), - BATCH_WRITER_MAX_WRITE_THREADS("batch.writer.max.write.threads", "3", - "Maximum number of threads to use for writing data to tablet servers."), - BATCH_WRITER_DURABILITY("batch.writer.durability", "default", Property.TABLE_DURABILITY - .getDescription() + " Setting this property will " - + "change the durability for the BatchWriter session. A value of \"default\" will use the " - + "table's durability setting. "), + + " exception is thrown when this timeout is exceeded. Set to zero for no timeout.", + "2.0.0", false), + BATCH_WRITER_THREADS_MAX("batch.writer.threads.max", "3", PropertyType.COUNT, + "Maximum number of threads to use for writing data to tablet servers.", "2.0.0", false), + BATCH_WRITER_DURABILITY("batch.writer.durability", "default", PropertyType.DURABILITY, + Property.TABLE_DURABILITY.getDescription() + " Setting this property will " + + "change the durability for the BatchWriter session. A value of \"default\" will" + + " use the table's durability setting. ", + "2.0.0", false), // Scanner - SCANNER_BATCH_SIZE("scanner.batch.size", "1000", - "Number of key/value pairs that will be fetched at time from tablet server"), + SCANNER_BATCH_SIZE("scanner.batch.size", "1000", PropertyType.COUNT, + "Number of key/value pairs that will be fetched at time from tablet server", "2.0.0", false), // BatchScanner - BATCH_SCANNER_NUM_QUERY_THREADS("batch.scanner.num.query.threads", "3", - "Number of concurrent query threads to spawn for querying"), + BATCH_SCANNER_NUM_QUERY_THREADS("batch.scanner.num.query.threads", "3", PropertyType.COUNT, + "Number of concurrent query threads to spawn for querying", "2.0.0", false), // Bulk load BULK_LOAD_THREADS("bulk.threads", ImportMappingOptions.BULK_LOAD_THREADS_DEFAULT, + PropertyType.COUNT, "The number of threads used to inspect bulk load files to determine where files go. " + "If the value ends with C, then it will be multiplied by the number of cores on the " - + "system. This property is only used by the bulk import API introduced in 2.0.0."), + + "system. This property is only used by the bulk import API introduced in 2.0.0.", + "2.0.0", false), // SSL SSL_ENABLED("ssl.enabled", "false", "Enable SSL for client RPC"), SSL_KEYSTORE_PASSWORD("ssl.keystore.password", "", "Password used to encrypt keystore"), - SSL_KEYSTORE_PATH("ssl.keystore.path", "", "Path to SSL keystore file"), + SSL_KEYSTORE_PATH("ssl.keystore.path", "", PropertyType.PATH, "Path to SSL keystore file", + "2.0.0", false), SSL_KEYSTORE_TYPE("ssl.keystore.type", "jks", "Type of SSL keystore"), SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password", "", "Password used to encrypt truststore"), - SSL_TRUSTSTORE_PATH("ssl.truststore.path", "", "Path to SSL truststore file"), + SSL_TRUSTSTORE_PATH("ssl.truststore.path", "", PropertyType.PATH, "Path to SSL truststore file", + "2.0.0", false), SSL_TRUSTSTORE_TYPE("ssl.truststore.type", "jks", "Type of SSL truststore"), SSL_USE_JSSE("ssl.use.jsse", "false", "Use JSSE system properties to configure SSL"), @@ -99,36 +110,30 @@ public enum ClientProperty { // Trace TRACE_SPAN_RECEIVERS("trace.span.receivers", "org.apache.accumulo.tracer.ZooTraceClient", "A list of span receiver classes to send trace spans"), - TRACE_ZOOKEEPER_PATH("trace.zookeeper.path", Constants.ZTRACERS, - "The zookeeper node where tracers are registered"); + TRACE_ZOOKEEPER_PATH("trace.zookeeper.path", Constants.ZTRACERS, PropertyType.PATH, + "The zookeeper node where tracers are registered", "2.0.0", false); public static final String TRACE_SPAN_RECEIVER_PREFIX = "trace.span.receiver"; private String key; private String defaultValue; + private PropertyType type; private String description; private String since; private boolean required; - ClientProperty(String key, String defaultValue, String description, String since, - boolean required) { - Objects.requireNonNull(key); - Objects.requireNonNull(defaultValue); - Objects.requireNonNull(description); - Objects.requireNonNull(since); - this.key = key; - this.defaultValue = defaultValue; - this.description = description; - this.since = since; + ClientProperty(String key, String defaultValue, PropertyType type, String description, + String since, boolean required) { + this.key = Objects.requireNonNull(key); + this.defaultValue = Objects.requireNonNull(defaultValue); + this.type = Objects.requireNonNull(type); + this.description = Objects.requireNonNull(description); + this.since = Objects.requireNonNull(since); this.required = required; } - ClientProperty(String key, String defaultValue, String description, String since) { - this(key, defaultValue, description, since, false); - } - ClientProperty(String key, String defaultValue, String description) { - this(key, defaultValue, description, ""); + this(key, defaultValue, PropertyType.STRING, description, "", false); } public String getKey() { @@ -139,6 +144,10 @@ public enum ClientProperty { return defaultValue; } + public PropertyType getType() { + return type; + } + public String getDescription() { return description; } @@ -161,6 +170,10 @@ public enum ClientProperty { if (isRequired() && value.isEmpty()) { throw new IllegalArgumentException(getKey() + " must be set!"); } + if (!type.isValidFormat(value)) { + throw new IllegalArgumentException( + "Invalid format for type \"" + type + "\" for provided value: " + value); + } return value; } @@ -170,12 +183,24 @@ public enum ClientProperty { return (value == null || value.isEmpty()); } - public Long getLong(Properties properties) { + public Long getBytes(Properties properties) { + String value = getValue(properties); + if (value.isEmpty()) { + return null; + } + checkState(getType() == PropertyType.BYTES, + "Invalid type getting bytes. Type must be " + PropertyType.BYTES + ", not " + getType()); + return ConfigurationTypeHelper.getMemoryAsBytes(value); + } + + public Long getTimeInMillis(Properties properties) { String value = getValue(properties); if (value.isEmpty()) { return null; } - return Long.parseLong(value); + checkState(getType() == PropertyType.TIMEDURATION, "Invalid type getting time. Type must be " + + PropertyType.TIMEDURATION + ", not " + getType()); + return ConfigurationTypeHelper.getTimeInMillis(value); } public Integer getInteger(Properties properties) { @@ -194,6 +219,18 @@ public enum ClientProperty { return Boolean.valueOf(value); } + public void setBytes(Properties properties, Long bytes) { + checkState(PropertyType.BYTES == getType(), "Invalid type setting " + "bytes. Type must be " + + PropertyType.BYTES + ", not " + getType()); + properties.setProperty(getKey(), bytes.toString()); + } + + public void setTimeInMillis(Properties properties, Long milliseconds) { + checkState(PropertyType.TIMEDURATION == getType(), "Invalid type setting " + + "time. Type must be " + PropertyType.TIMEDURATION + ", not " + getType()); + properties.setProperty(getKey(), milliseconds + "ms"); + } + public static Properties getPrefix(Properties properties, String prefix) { Properties props = new Properties(); for (Object keyObj : properties.keySet()) { diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java index 295ed69..71a5d06 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java @@ -116,7 +116,7 @@ public enum PropertyType { "A list of fully qualified java class names representing classes on the classpath.\n" + "An example is 'java.lang.String', rather than 'String'"), - DURABILITY("durability", in(true, null, "none", "log", "flush", "sync"), + DURABILITY("durability", in(true, null, "default", "none", "log", "flush", "sync"), "One of 'none', 'log', 'flush' or 'sync'."), STRING("string", x -> true, diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientConfConverterTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientConfConverterTest.java index 35eb547..5b99e34 100644 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientConfConverterTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/ClientConfConverterTest.java @@ -37,7 +37,7 @@ public class ClientConfConverterTest { before.setProperty(ClientProperty.SSL_TRUSTSTORE_PATH.getKey(), "trust_path"); before.setProperty(ClientProperty.SASL_ENABLED.getKey(), "true"); before.setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY.getKey(), "primary"); - before.setProperty(ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS.getKey(), "5"); + before.setProperty(ClientProperty.BATCH_WRITER_THREADS_MAX.getKey(), "5"); Properties after = ClientConfConverter.toProperties(ClientConfConverter.toClientConf(before)); assertEquals(before, after); diff --git a/core/src/test/java/org/apache/accumulo/core/conf/ClientPropertyTest.java b/core/src/test/java/org/apache/accumulo/core/conf/ClientPropertyTest.java index 65df313..40d20d7 100644 --- a/core/src/test/java/org/apache/accumulo/core/conf/ClientPropertyTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/ClientPropertyTest.java @@ -23,7 +23,9 @@ import java.util.Properties; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; public class ClientPropertyTest { @@ -54,4 +56,30 @@ public class ClientPropertyTest { ClientProperty.setKerberosKeytab(props, "/path/to/keytab"); assertEquals("/path/to/keytab", ClientProperty.AUTH_TOKEN.getValue(props)); } + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testTypes() { + Properties props = new Properties(); + props.setProperty(ClientProperty.BATCH_WRITER_LATENCY_MAX.getKey(), "10s"); + Long value = ClientProperty.BATCH_WRITER_LATENCY_MAX.getTimeInMillis(props); + assertEquals(10000L, value.longValue()); + + props.setProperty(ClientProperty.BATCH_WRITER_MEMORY_MAX.getKey(), "555M"); + value = ClientProperty.BATCH_WRITER_MEMORY_MAX.getBytes(props); + assertEquals(581959680L, value.longValue()); + + ClientProperty.BATCH_WRITER_MEMORY_MAX.setBytes(props, 5819L); + value = ClientProperty.BATCH_WRITER_MEMORY_MAX.getBytes(props); + assertEquals(5819L, value.longValue()); + + ClientProperty.BATCH_WRITER_LATENCY_MAX.setTimeInMillis(props, 1234L); + value = ClientProperty.BATCH_WRITER_LATENCY_MAX.getTimeInMillis(props); + assertEquals(1234L, value.longValue()); + + exception.expect(IllegalStateException.class); + ClientProperty.BATCH_WRITER_LATENCY_MAX.getBytes(props); + } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java index 9f325d4..6df69f6 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/OutputConfigurator.java @@ -17,10 +17,10 @@ package org.apache.accumulo.hadoopImpl.mapreduce.lib; import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_DURABILITY; -import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC; -import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES; -import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC; -import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS; +import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_LATENCY_MAX; +import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_MEMORY_MAX; +import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_THREADS_MAX; +import static org.apache.accumulo.core.conf.ClientProperty.BATCH_WRITER_TIMEOUT_MAX; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -98,18 +98,18 @@ public class OutputConfigurator extends ConfiguratorBase { String property = props.getProperty(BATCH_WRITER_DURABILITY.getKey()); if (property != null) bwConfig.setDurability(DurabilityImpl.fromString(property)); - property = props.getProperty(BATCH_WRITER_MAX_LATENCY_SEC.getKey()); - if (property != null) - bwConfig.setMaxLatency(Long.parseLong(property), TimeUnit.MILLISECONDS); - property = props.getProperty(BATCH_WRITER_MAX_MEMORY_BYTES.getKey()); - if (property != null) - bwConfig.setMaxMemory(Long.parseLong(property)); - property = props.getProperty(BATCH_WRITER_MAX_TIMEOUT_SEC.getKey()); - if (property != null) - bwConfig.setTimeout(Long.parseLong(property), TimeUnit.MILLISECONDS); - property = props.getProperty(BATCH_WRITER_MAX_WRITE_THREADS.getKey()); - if (property != null) - bwConfig.setMaxWriteThreads(Integer.parseInt(property)); + Long value = BATCH_WRITER_LATENCY_MAX.getTimeInMillis(props); + if (value != null) + bwConfig.setMaxLatency(value, TimeUnit.MILLISECONDS); + value = BATCH_WRITER_MEMORY_MAX.getBytes(props); + if (value != null) + bwConfig.setMaxMemory(value); + value = BATCH_WRITER_TIMEOUT_MAX.getTimeInMillis(props); + if (value != null) + bwConfig.setTimeout(value, TimeUnit.MILLISECONDS); + Integer intValue = BATCH_WRITER_THREADS_MAX.getInteger(props); + if (intValue != null) + bwConfig.setMaxWriteThreads(intValue); return bwConfig; }