This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new 14e0bd6 Multi table input for new MR. Closes #749 (#821) 14e0bd6 is described below commit 14e0bd6f671ad12eada7a62aaa49243ca7eb000d Author: Mike Miller <mmil...@apache.org> AuthorDate: Wed Dec 19 15:01:13 2018 -0500 Multi table input for new MR. Closes #749 (#821) * Modified new MapReduce builder and implementation to allow multiple tables through the same fluent API as AccumuloInputFormat * Replaced setIterators method with addIterator --- .../hadoop/mapred/AccumuloInputFormat.java | 4 +- .../hadoop/mapred/AccumuloRowInputFormat.java | 2 +- .../hadoop/mapreduce/AccumuloInputFormat.java | 11 ++ .../hadoop/mapreduce/InputFormatBuilder.java | 15 +- .../hadoopImpl/mapred/AbstractInputFormat.java | 41 +---- .../hadoopImpl/mapreduce/AbstractInputFormat.java | 35 +--- .../mapreduce/InputFormatBuilderImpl.java | 198 +++++++++++---------- .../hadoopImpl/mapreduce/InputTableConfig.java | 98 +++++----- .../mapreduce/lib/InputConfigurator.java | 2 +- .../hadoop/its/mapred/MultiTableInputFormatIT.java | 152 ++++++++++++++++ .../its/mapreduce/MultiTableInputFormatIT.java | 152 ++++++++++++++++ .../hadoop/mapred/AccumuloInputFormatTest.java | 11 ++ .../hadoop/mapred/MultiTableInputFormatTest.java | 123 +++++++++++++ .../hadoop/mapreduce/AccumuloInputFormatTest.java | 13 ++ .../mapreduce/MultiTableInputFormatTest.java | 125 +++++++++++++ .../hadoopImpl/mapreduce/InputTableConfigTest.java | 2 +- 16 files changed, 769 insertions(+), 215 deletions(-) diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java index 6fabcf9..2523a0d 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormat.java @@ -54,7 +54,7 @@ public class AccumuloInputFormat implements InputFormat<Key,Value> { */ @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - return AbstractInputFormat.getSplits(job, numSplits); + return AbstractInputFormat.getSplits(job); } @Override @@ -96,6 +96,6 @@ public class AccumuloInputFormat implements InputFormat<Key,Value> { * Sets all the information required for this map reduce job. */ public static InputFormatBuilder.ClientParams<JobConf> configure() { - return new InputFormatBuilderImpl<JobConf>(CLASS); + return new InputFormatBuilderImpl<>(CLASS); } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java index bdcfbdb..cb1d650 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapred/AccumuloRowInputFormat.java @@ -53,7 +53,7 @@ public class AccumuloRowInputFormat implements InputFormat<Text,PeekingIterator< */ @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - return AbstractInputFormat.getSplits(job, numSplits); + return AbstractInputFormat.getSplits(job); } @Override diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java index 2bffe35..2c6259d 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormat.java @@ -49,6 +49,17 @@ import org.slf4j.LoggerFactory; * .store(job); * </pre> * + * Multiple tables can be set by configuring clientProperties once and then calling .table() for + * each table. The methods following a call to .table() apply only to that table. For Example: + * + * <pre> + * AccumuloInputFormat.configure().clientProperties(props) // set client props once + * .table(table1).auths(auths1).fetchColumns(cols1).batchScan(true) // options for table1 + * .table(table2).ranges(range2).auths(auths2).addIterator(iter2) // options for table2 + * .table(table3).ranges(range3).auths(auths3).addIterator(iter3) // options for table3 + * .store(job); // store all tables in the job when finished + * </pre> + * * For descriptions of all options see * {@link org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder.InputFormatOptions} * diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java index 4677bda..ddb4deb 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/InputFormatBuilder.java @@ -62,12 +62,18 @@ public interface InputFormatBuilder { */ interface TableParams<T> { /** - * Sets the name of the input table, over which this job will scan. + * Sets the name of the input table, over which this job will scan. At least one table is + * required before calling store(Job) * * @param tableName * the table to use when the tablename is null in the write call */ InputFormatOptions<T> table(String tableName); + + /** + * Finish configuring, verify and serialize options into the JobConf or Job + */ + void store(T j) throws AccumuloException, AccumuloSecurityException; } /** @@ -75,7 +81,7 @@ public interface InputFormatBuilder { * * @since 2.0 */ - interface InputFormatOptions<T> { + interface InputFormatOptions<T> extends TableParams<T> { /** * Sets the {@link Authorizations} used to scan. Must be a subset of the user's authorizations. * By Default, all of the users auths are set. @@ -217,10 +223,5 @@ public interface InputFormatBuilder { * By default, this feature is <b>disabled</b>. */ InputFormatOptions<T> batchScan(boolean value); - - /** - * Finish configuring, verify and serialize options into the JobConf or Job - */ - void store(T j) throws AccumuloException, AccumuloSecurityException; } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java index fc34f3f..4d1d4ea 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AbstractInputFormat.java @@ -201,35 +201,6 @@ public abstract class AbstractInputFormat { } /** - * Fetches all {@link InputTableConfig}s that have been set on the given Hadoop job. - * - * @param job - * the Hadoop job instance to be configured - * @return the {@link InputTableConfig} objects set on the job - * @since 1.6.0 - */ - protected static Map<String,InputTableConfig> getInputTableConfigs(JobConf job) { - return InputConfigurator.getInputTableConfigs(CLASS, job); - } - - /** - * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table. - * - * <p> - * null is returned in the event that the table doesn't exist. - * - * @param job - * the Hadoop job instance to be configured - * @param tableName - * the table name for which to grab the config object - * @return the {@link InputTableConfig} for the given table - * @since 1.6.0 - */ - protected static InputTableConfig getInputTableConfig(JobConf job, String tableName) { - return InputConfigurator.getInputTableConfig(CLASS, job, tableName); - } - - /** * An abstract base class to be used to create {@link org.apache.hadoop.mapred.RecordReader} * instances that convert from Accumulo * {@link org.apache.accumulo.core.data.Key}/{@link org.apache.accumulo.core.data.Value} pairs to @@ -305,7 +276,8 @@ public abstract class AbstractInputFormat { // in case the table name changed, we can still use the previous name for terms of // configuration, but the scanner will use the table id resolved at job setup time - InputTableConfig tableConfig = getInputTableConfig(job, baseSplit.getTableName()); + InputTableConfig tableConfig = InputConfigurator.getInputTableConfig(CLASS, job, + baseSplit.getTableName()); log.debug("Created client with user: " + context.whoami()); log.debug("Creating scanner for table: " + table); @@ -452,18 +424,13 @@ public abstract class AbstractInputFormat { /** * Gets the splits of the tables that have been set on the job by reading the metadata table for * the specified ranges. - * - * @return the splits from the tables based on the ranges. - * @throws java.io.IOException - * if a table set on the job doesn't exist or an error occurs initializing the tablet - * locator */ - public static InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + public static InputSplit[] getSplits(JobConf job) throws IOException { validateOptions(job); Random random = new SecureRandom(); LinkedList<InputSplit> splits = new LinkedList<>(); - Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(job); + Map<String,InputTableConfig> tableConfigs = InputConfigurator.getInputTableConfigs(CLASS, job); try (AccumuloClient client = createClient(job)) { for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) { String tableName = tableConfigEntry.getKey(); diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java index c6702b6..b845ff8 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AbstractInputFormat.java @@ -182,35 +182,6 @@ public abstract class AbstractInputFormat { return InputConfigurator.getScanAuthorizations(CLASS, context.getConfiguration()); } - /** - * Fetches all {@link InputTableConfig}s that have been set on the given job. - * - * @param context - * the Hadoop job instance to be configured - * @return the {@link InputTableConfig} objects for the job - * @since 1.6.0 - */ - public static Map<String,InputTableConfig> getInputTableConfigs(JobContext context) { - return InputConfigurator.getInputTableConfigs(CLASS, context.getConfiguration()); - } - - /** - * Fetches a {@link InputTableConfig} that has been set on the configuration for a specific table. - * - * <p> - * null is returned in the event that the table doesn't exist. - * - * @param context - * the Hadoop job instance to be configured - * @param tableName - * the table name for which to grab the config object - * @return the {@link InputTableConfig} for the given table - * @since 1.6.0 - */ - protected static InputTableConfig getInputTableConfig(JobContext context, String tableName) { - return InputConfigurator.getInputTableConfig(CLASS, context.getConfiguration(), tableName); - } - // InputFormat doesn't have the equivalent of OutputFormat's checkOutputSpecs(JobContext job) /** * Check whether a configuration is fully configured to be used with an Accumulo @@ -310,7 +281,8 @@ public abstract class AbstractInputFormat { // in case the table name changed, we can still use the previous name for terms of // configuration, // but the scanner will use the table id resolved at job setup time - InputTableConfig tableConfig = getInputTableConfig(attempt, split.getTableName()); + InputTableConfig tableConfig = InputConfigurator.getInputTableConfig(CLASS, + attempt.getConfiguration(), split.getTableName()); log.debug("Creating client with user: " + client.whoami()); log.debug("Creating scanner for table: " + table); @@ -476,7 +448,8 @@ public abstract class AbstractInputFormat { Random random = new SecureRandom(); LinkedList<InputSplit> splits = new LinkedList<>(); try (AccumuloClient client = createClient(context)) { - Map<String,InputTableConfig> tableConfigs = getInputTableConfigs(context); + Map<String,InputTableConfig> tableConfigs = InputConfigurator.getInputTableConfigs(CLASS, + context.getConfiguration()); for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) { String tableName = tableConfigEntry.getKey(); diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java index 3d85205..0af63d8 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputFormatBuilderImpl.java @@ -19,9 +19,9 @@ package org.apache.accumulo.hadoopImpl.mapreduce; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Properties; import org.apache.accumulo.core.client.Accumulo; @@ -46,17 +46,10 @@ public class InputFormatBuilderImpl<T> InputFormatBuilder.TableParams<T>, InputFormatBuilder.InputFormatOptions<T> { Class<?> callingClass; - String tableName; ClientInfo clientInfo; - Authorizations scanAuths; - Optional<String> context = Optional.empty(); - Collection<Range> ranges = Collections.emptyList(); - Collection<IteratorSetting.Column> fetchColumns = Collections.emptyList(); - Map<String,IteratorSetting> iterators = Collections.emptyMap(); - Optional<SamplerConfiguration> samplerConfig = Optional.empty(); - Map<String,String> hints = Collections.emptyMap(); - BuilderBooleans bools = new BuilderBooleans(); + String currentTable; + Map<String,InputTableConfig> tableConfigMap = Collections.emptyMap(); public InputFormatBuilderImpl(Class<?> callingClass) { this.callingClass = callingClass; @@ -71,38 +64,44 @@ public class InputFormatBuilderImpl<T> @Override public InputFormatBuilder.InputFormatOptions<T> table(String tableName) { - this.tableName = Objects.requireNonNull(tableName, "Table name must not be null"); + this.currentTable = Objects.requireNonNull(tableName, "Table name must not be null"); + if (tableConfigMap.isEmpty()) + tableConfigMap = new LinkedHashMap<>(); + tableConfigMap.put(currentTable, new InputTableConfig()); return this; } @Override public InputFormatBuilder.InputFormatOptions<T> auths(Authorizations auths) { - this.scanAuths = Objects.requireNonNull(auths, "Authorizations must not be null"); + tableConfigMap.get(currentTable) + .setScanAuths(Objects.requireNonNull(auths, "Authorizations must not be null")); return this; } @Override public InputFormatBuilder.InputFormatOptions<T> classLoaderContext(String context) { - this.context = Optional.of(context); + tableConfigMap.get(currentTable).setContext(context); return this; } @Override public InputFormatBuilder.InputFormatOptions<T> ranges(Collection<Range> ranges) { - this.ranges = ImmutableList + List<Range> newRanges = ImmutableList .copyOf(Objects.requireNonNull(ranges, "Collection of ranges is null")); - if (this.ranges.size() == 0) + if (newRanges.size() == 0) throw new IllegalArgumentException("Specified collection of ranges is empty."); + tableConfigMap.get(currentTable).setRanges(newRanges); return this; } @Override public InputFormatBuilder.InputFormatOptions<T> fetchColumns( Collection<IteratorSetting.Column> fetchColumns) { - this.fetchColumns = ImmutableList + Collection<IteratorSetting.Column> newFetchColumns = ImmutableList .copyOf(Objects.requireNonNull(fetchColumns, "Collection of fetch columns is null")); - if (this.fetchColumns.size() == 0) + if (newFetchColumns.size() == 0) throw new IllegalArgumentException("Specified collection of fetch columns is empty."); + tableConfigMap.get(currentTable).fetchColumns(newFetchColumns); return this; } @@ -110,57 +109,56 @@ public class InputFormatBuilderImpl<T> public InputFormatBuilder.InputFormatOptions<T> addIterator(IteratorSetting cfg) { // store iterators by name to prevent duplicates Objects.requireNonNull(cfg, "IteratorSetting must not be null."); - if (this.iterators.size() == 0) - this.iterators = new LinkedHashMap<>(); - this.iterators.put(cfg.getName(), cfg); + tableConfigMap.get(currentTable).addIterator(cfg); return this; } @Override public InputFormatBuilder.InputFormatOptions<T> executionHints(Map<String,String> hints) { - this.hints = ImmutableMap + Map<String,String> newHints = ImmutableMap .copyOf(Objects.requireNonNull(hints, "Map of execution hints must not be null.")); - if (hints.size() == 0) + if (newHints.size() == 0) throw new IllegalArgumentException("Specified map of execution hints is empty."); + tableConfigMap.get(currentTable).setExecutionHints(newHints); return this; } @Override public InputFormatBuilder.InputFormatOptions<T> samplerConfiguration( SamplerConfiguration samplerConfig) { - this.samplerConfig = Optional.of(samplerConfig); + tableConfigMap.get(currentTable).setSamplerConfiguration(samplerConfig); return this; } @Override public InputFormatOptions<T> autoAdjustRanges(boolean value) { - bools.autoAdjustRanges = value; + tableConfigMap.get(currentTable).setAutoAdjustRanges(value); return this; } @Override public InputFormatOptions<T> scanIsolation(boolean value) { - bools.scanIsolation = value; + tableConfigMap.get(currentTable).setUseIsolatedScanners(value); return this; } @Override public InputFormatOptions<T> localIterators(boolean value) { - bools.localIters = value; + tableConfigMap.get(currentTable).setUseLocalIterators(value); return this; } @Override public InputFormatOptions<T> offlineScan(boolean value) { - bools.offlineScan = value; + tableConfigMap.get(currentTable).setOfflineScan(value); return this; } @Override public InputFormatOptions<T> batchScan(boolean value) { - bools.batchScan = value; + tableConfigMap.get(currentTable).setUseBatchScan(value); if (value) - bools.autoAdjustRanges = true; + tableConfigMap.get(currentTable).setAutoAdjustRanges(true); return this; } @@ -180,30 +178,40 @@ public class InputFormatBuilderImpl<T> */ private void store(Job job) throws AccumuloException, AccumuloSecurityException { AbstractInputFormat.setClientInfo(job, clientInfo); - InputFormatBase.setInputTableName(job, tableName); - - scanAuths = getUserAuths(scanAuths, clientInfo); - AbstractInputFormat.setScanAuthorizations(job, scanAuths); - - // all optional values - if (context.isPresent()) - AbstractInputFormat.setClassLoaderContext(job, context.get()); - if (ranges.size() > 0) - InputFormatBase.setRanges(job, ranges); - if (iterators.size() > 0) - InputConfigurator.writeIteratorsToConf(callingClass, job.getConfiguration(), - iterators.values()); - if (fetchColumns.size() > 0) - InputConfigurator.fetchColumns(callingClass, job.getConfiguration(), fetchColumns); - if (samplerConfig.isPresent()) - InputFormatBase.setSamplerConfiguration(job, samplerConfig.get()); - if (hints.size() > 0) - InputFormatBase.setExecutionHints(job, hints); - InputFormatBase.setAutoAdjustRanges(job, bools.autoAdjustRanges); - InputFormatBase.setScanIsolation(job, bools.scanIsolation); - InputFormatBase.setLocalIterators(job, bools.localIters); - InputFormatBase.setOfflineTableScan(job, bools.offlineScan); - InputFormatBase.setBatchScan(job, bools.batchScan); + if (tableConfigMap.size() == 0) { + throw new IllegalArgumentException("At least one Table must be configured for job."); + } + // if only one table use the single table configuration method + if (tableConfigMap.size() == 1) { + Map.Entry<String,InputTableConfig> entry = tableConfigMap.entrySet().iterator().next(); + InputFormatBase.setInputTableName(job, entry.getKey()); + InputTableConfig config = entry.getValue(); + if (!config.getScanAuths().isPresent()) + config.setScanAuths(getUserAuths(clientInfo)); + AbstractInputFormat.setScanAuthorizations(job, config.getScanAuths().get()); + // all optional values + if (config.getContext().isPresent()) + AbstractInputFormat.setClassLoaderContext(job, config.getContext().get()); + if (config.getRanges().size() > 0) + InputFormatBase.setRanges(job, config.getRanges()); + if (config.getIterators().size() > 0) + InputConfigurator.writeIteratorsToConf(callingClass, job.getConfiguration(), + config.getIterators()); + if (config.getFetchedColumns().size() > 0) + InputConfigurator.fetchColumns(callingClass, job.getConfiguration(), + config.getFetchedColumns()); + if (config.getSamplerConfiguration() != null) + InputFormatBase.setSamplerConfiguration(job, config.getSamplerConfiguration()); + if (config.getExecutionHints().size() > 0) + InputFormatBase.setExecutionHints(job, config.getExecutionHints()); + InputFormatBase.setAutoAdjustRanges(job, config.shouldAutoAdjustRanges()); + InputFormatBase.setScanIsolation(job, config.shouldUseIsolatedScanners()); + InputFormatBase.setLocalIterators(job, config.shouldUseLocalIterators()); + InputFormatBase.setOfflineTableScan(job, config.isOfflineScan()); + InputFormatBase.setBatchScan(job, config.shouldBatchScan()); + } else { + InputConfigurator.setInputTableConfigs(callingClass, job.getConfiguration(), tableConfigMap); + } } /** @@ -211,52 +219,56 @@ public class InputFormatBuilderImpl<T> */ private void store(JobConf jobConf) throws AccumuloException, AccumuloSecurityException { org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClientInfo(jobConf, clientInfo); - org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName(jobConf, tableName); - - scanAuths = getUserAuths(scanAuths, clientInfo); - org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations(jobConf, - scanAuths); - - // all optional values - if (context.isPresent()) - org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext(jobConf, - context.get()); - if (ranges.size() > 0) - org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges(jobConf, ranges); - if (iterators.size() > 0) - InputConfigurator.writeIteratorsToConf(callingClass, jobConf, iterators.values()); - if (fetchColumns.size() > 0) - InputConfigurator.fetchColumns(callingClass, jobConf, fetchColumns); - if (samplerConfig.isPresent()) - org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration(jobConf, - samplerConfig.get()); - if (hints.size() > 0) - org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints(jobConf, hints); - org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges(jobConf, - bools.autoAdjustRanges); - org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation(jobConf, - bools.scanIsolation); - org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators(jobConf, - bools.localIters); - org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan(jobConf, - bools.offlineScan); - org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan(jobConf, bools.batchScan); + if (tableConfigMap.size() == 0) { + throw new IllegalArgumentException("At least one Table must be configured for job."); + } + // if only one table use the single table configuration method + if (tableConfigMap.size() == 1) { + Map.Entry<String,InputTableConfig> entry = tableConfigMap.entrySet().iterator().next(); + org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setInputTableName(jobConf, + entry.getKey()); + InputTableConfig config = entry.getValue(); + if (!config.getScanAuths().isPresent()) + config.setScanAuths(getUserAuths(clientInfo)); + org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setScanAuthorizations(jobConf, + config.getScanAuths().get()); + // all optional values + if (config.getContext().isPresent()) + org.apache.accumulo.hadoopImpl.mapred.AbstractInputFormat.setClassLoaderContext(jobConf, + config.getContext().get()); + if (config.getRanges().size() > 0) + org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setRanges(jobConf, + config.getRanges()); + if (config.getIterators().size() > 0) + InputConfigurator.writeIteratorsToConf(callingClass, jobConf, config.getIterators()); + if (config.getFetchedColumns().size() > 0) + InputConfigurator.fetchColumns(callingClass, jobConf, config.getFetchedColumns()); + if (config.getSamplerConfiguration() != null) + org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setSamplerConfiguration(jobConf, + config.getSamplerConfiguration()); + if (config.getExecutionHints().size() > 0) + org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setExecutionHints(jobConf, + config.getExecutionHints()); + org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setAutoAdjustRanges(jobConf, + config.shouldAutoAdjustRanges()); + org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setScanIsolation(jobConf, + config.shouldUseIsolatedScanners()); + org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setLocalIterators(jobConf, + config.shouldUseLocalIterators()); + org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setOfflineTableScan(jobConf, + config.isOfflineScan()); + org.apache.accumulo.hadoopImpl.mapred.InputFormatBase.setBatchScan(jobConf, + config.shouldBatchScan()); + } else { + InputConfigurator.setInputTableConfigs(callingClass, jobConf, tableConfigMap); + } } - private Authorizations getUserAuths(Authorizations scanAuths, ClientInfo clientInfo) + private Authorizations getUserAuths(ClientInfo clientInfo) throws AccumuloSecurityException, AccumuloException { - if (scanAuths != null) - return scanAuths; try (AccumuloClient c = Accumulo.newClient().from(clientInfo.getProperties()).build()) { return c.securityOperations().getUserAuthorizations(clientInfo.getPrincipal()); } } - private static class BuilderBooleans { - boolean autoAdjustRanges = true; - boolean scanIsolation = false; - boolean offlineScan = false; - boolean localIters = false; - boolean batchScan = false; - } } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java index c90c92f..9d27d6f 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfig.java @@ -24,15 +24,20 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Pair; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -43,14 +48,19 @@ import org.apache.hadoop.io.Writable; */ public class InputTableConfig implements Writable { - private List<IteratorSetting> iterators; - private List<Range> ranges; - private Collection<IteratorSetting.Column> columns; + // store iterators by name to prevent duplicates for addIterator + private Map<String,IteratorSetting> iterators = Collections.emptyMap(); + private List<Range> ranges = Collections.emptyList(); + private Collection<IteratorSetting.Column> columns = Collections.emptyList(); + + private Optional<Authorizations> scanAuths = Optional.empty(); + private Optional<String> context = Optional.empty(); private boolean autoAdjustRanges = true; private boolean useLocalIterators = false; private boolean useIsolatedScanners = false; private boolean offlineScan = false; + private boolean batchScan = false; private SamplerConfiguration samplerConfig = null; private Map<String,String> executionHints = Collections.emptyMap(); @@ -83,7 +93,7 @@ public class InputTableConfig implements Writable { * Returns the ranges to be queried in the configuration */ public List<Range> getRanges() { - return ranges != null ? ranges : new ArrayList<>(); + return ranges; } /** @@ -107,23 +117,17 @@ public class InputTableConfig implements Writable { return columns != null ? columns : new HashSet<>(); } - /** - * Set iterators on to be used in the query. - * - * @param iterators - * the configurations for the iterators - * @since 1.6.0 - */ - public InputTableConfig setIterators(List<IteratorSetting> iterators) { - this.iterators = iterators; - return this; + public void addIterator(IteratorSetting cfg) { + if (this.iterators.isEmpty()) + this.iterators = new LinkedHashMap<>(); + this.iterators.put(cfg.getName(), cfg); } /** * Returns the iterators to be set on this configuration */ public List<IteratorSetting> getIterators() { - return iterators != null ? iterators : new ArrayList<>(); + return new LinkedList<>(iterators.values()); } /** @@ -217,7 +221,6 @@ public class InputTableConfig implements Writable { * * @param offlineScan * the feature is enabled if true, disabled otherwise - * @since 1.6.0 */ public InputTableConfig setOfflineScan(boolean offlineScan) { this.offlineScan = offlineScan; @@ -228,7 +231,6 @@ public class InputTableConfig implements Writable { * Determines whether a configuration has the offline table scan feature enabled. * * @return true if the feature is enabled, false otherwise - * @since 1.6.0 * @see #setOfflineScan(boolean) */ public boolean isOfflineScan() { @@ -243,7 +245,6 @@ public class InputTableConfig implements Writable { * * @param useIsolatedScanners * the feature is enabled if true, disabled otherwise - * @since 1.6.0 */ public InputTableConfig setUseIsolatedScanners(boolean useIsolatedScanners) { this.useIsolatedScanners = useIsolatedScanners; @@ -254,55 +255,68 @@ public class InputTableConfig implements Writable { * Determines whether a configuration has isolation enabled. * * @return true if the feature is enabled, false otherwise - * @since 1.6.0 * @see #setUseIsolatedScanners(boolean) */ public boolean shouldUseIsolatedScanners() { return useIsolatedScanners; } + public void setUseBatchScan(boolean value) { + this.batchScan = value; + } + + public boolean shouldBatchScan() { + return batchScan; + } + /** * Set the sampler configuration to use when reading from the data. * * @see ScannerBase#setSamplerConfiguration(SamplerConfiguration) * @see InputFormatBase#setSamplerConfiguration(org.apache.hadoop.mapreduce.Job, * SamplerConfiguration) - * - * @since 1.8.0 */ public void setSamplerConfiguration(SamplerConfiguration samplerConfiguration) { this.samplerConfig = samplerConfiguration; } - /** - * - * @since 1.8.0 - */ public SamplerConfiguration getSamplerConfiguration() { return samplerConfig; } /** * The execution hints to set on created scanners. See {@link ScannerBase#setExecutionHints(Map)} - * - * @since 2.0.0 */ public void setExecutionHints(Map<String,String> executionHints) { this.executionHints = executionHints; } - /** - * @since 2.0.0 - */ public Map<String,String> getExecutionHints() { return executionHints; } + public Optional<Authorizations> getScanAuths() { + return scanAuths; + } + + public InputTableConfig setScanAuths(Authorizations scanAuths) { + this.scanAuths = Optional.of(scanAuths); + return this; + } + + public Optional<String> getContext() { + return context; + } + + public void setContext(String context) { + this.context = Optional.of(context); + } + @Override public void write(DataOutput dataOutput) throws IOException { if (iterators != null) { dataOutput.writeInt(iterators.size()); - for (IteratorSetting setting : iterators) + for (IteratorSetting setting : getIterators()) setting.write(dataOutput); } else { dataOutput.writeInt(0); @@ -340,7 +354,7 @@ public class InputTableConfig implements Writable { new SamplerConfigurationImpl(samplerConfig).write(dataOutput); } - if (executionHints == null || executionHints.size() == 0) { + if (executionHints.isEmpty()) { dataOutput.writeInt(0); } else { dataOutput.writeInt(executionHints.size()); @@ -356,9 +370,11 @@ public class InputTableConfig implements Writable { // load iterators long iterSize = dataInput.readInt(); if (iterSize > 0) - iterators = new ArrayList<>(); - for (int i = 0; i < iterSize; i++) - iterators.add(new IteratorSetting(dataInput)); + iterators = new LinkedHashMap<>(); + for (int i = 0; i < iterSize; i++) { + IteratorSetting newIter = new IteratorSetting(dataInput); + iterators.put(newIter.getName(), newIter); + } // load ranges long rangeSize = dataInput.readInt(); if (rangeSize > 0) @@ -419,17 +435,15 @@ public class InputTableConfig implements Writable { return false; if (useLocalIterators != that.useLocalIterators) return false; - if (columns != null ? !columns.equals(that.columns) : that.columns != null) + if (!Objects.equals(columns, that.columns)) return false; - if (iterators != null ? !iterators.equals(that.iterators) : that.iterators != null) + if (!Objects.equals(iterators, that.iterators)) return false; - if (ranges != null ? !ranges.equals(that.ranges) : that.ranges != null) + if (!Objects.equals(ranges, that.ranges)) return false; - if (executionHints != null ? !executionHints.equals(that.executionHints) - : that.executionHints != null) + if (!Objects.equals(executionHints, that.executionHints)) return false; - return samplerConfig != null ? samplerConfig.equals(that.samplerConfig) - : that.samplerConfig == null; + return Objects.equals(samplerConfig, that.samplerConfig); } @Override diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java index f52b427..e1da2b9 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/InputConfigurator.java @@ -780,7 +780,7 @@ public class InputConfigurator extends ConfiguratorBase { InputTableConfig queryConfig = new InputTableConfig(); List<IteratorSetting> itrs = getIterators(implementingClass, conf); if (itrs != null) - queryConfig.setIterators(itrs); + itrs.forEach(itr -> queryConfig.addIterator(itr)); Set<IteratorSetting.Column> columns = getFetchedColumns(implementingClass, conf); if (columns != null) queryConfig.fetchColumns(columns); diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/MultiTableInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/MultiTableInputFormatIT.java new file mode 100644 index 0000000..95244fe --- /dev/null +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/MultiTableInputFormatIT.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoop.its.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.io.IOException; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.hadoop.mapred.AccumuloInputFormat; +import org.apache.accumulo.hadoopImpl.mapred.RangeInputSplit; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Test; + +public class MultiTableInputFormatIT extends AccumuloClusterHarness { + + private static AssertionError e1 = null; + private static AssertionError e2 = null; + + private static class MRTester extends Configured implements Tool { + private static class TestMapper implements Mapper<Key,Value,Key,Value> { + Key key = null; + int count = 0; + + @Override + public void map(Key k, Value v, OutputCollector<Key,Value> output, Reporter reporter) + throws IOException { + try { + String tableName = ((RangeInputSplit) reporter.getInputSplit()).getTableName(); + if (key != null) + assertEquals(key.getRow().toString(), new String(v.get())); + assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow()); + assertEquals(String.format("%s_%09x", tableName, count), new String(v.get())); + } catch (AssertionError e) { + e1 = e; + } + key = new Key(k); + count++; + } + + @Override + public void configure(JobConf job) {} + + @Override + public void close() throws IOException { + try { + assertEquals(100, count); + } catch (AssertionError e) { + e2 = e; + } + } + + } + + @Override + public int run(String[] args) throws Exception { + + if (args.length != 2) { + throw new IllegalArgumentException( + "Usage : " + MRTester.class.getName() + " <table1> <table2>"); + } + + String table1 = args[0]; + String table2 = args[1]; + + JobConf job = new JobConf(getConf()); + job.setJarByClass(this.getClass()); + + job.setInputFormat(AccumuloInputFormat.class); + + AccumuloInputFormat.configure().clientProperties(getClientInfo().getProperties()) + .table(table1).table(table2).store(job); + + job.setMapperClass(TestMapper.class); + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + job.setOutputFormat(NullOutputFormat.class); + + job.setNumReduceTasks(0); + + return JobClient.runJob(job).isSuccessful() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + conf.set("mapreduce.framework.name", "local"); + conf.set("mapreduce.cluster.local.dir", + new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath()); + assertEquals(0, ToolRunner.run(conf, new MRTester(), args)); + } + } + + @Test + public void testMap() throws Exception { + String[] tableNames = getUniqueNames(2); + String table1 = tableNames[0]; + String table2 = tableNames[1]; + try (AccumuloClient c = createAccumuloClient()) { + c.tableOperations().create(table1); + c.tableOperations().create(table2); + BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig()); + BatchWriter bw2 = c.createBatchWriter(table2, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation t1m = new Mutation(new Text(String.format("%s_%09x", table1, i + 1))); + t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table1, i).getBytes())); + bw.addMutation(t1m); + Mutation t2m = new Mutation(new Text(String.format("%s_%09x", table2, i + 1))); + t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table2, i).getBytes())); + bw2.addMutation(t2m); + } + bw.close(); + bw2.close(); + + MRTester.main(new String[] {table1, table2}); + assertNull(e1); + assertNull(e2); + } + } + +} diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/MultiTableInputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/MultiTableInputFormatIT.java new file mode 100644 index 0000000..07349dd --- /dev/null +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/MultiTableInputFormatIT.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoop.its.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.io.IOException; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.hadoopImpl.mapreduce.RangeInputSplit; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Test; + +public class MultiTableInputFormatIT extends AccumuloClusterHarness { + + private static AssertionError e1 = null; + private static AssertionError e2 = null; + + private static class MRTester extends Configured implements Tool { + + private static class TestMapper extends Mapper<Key,Value,Key,Value> { + Key key = null; + int count = 0; + + @Override + protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { + try { + String tableName = ((RangeInputSplit) context.getInputSplit()).getTableName(); + if (key != null) + assertEquals(key.getRow().toString(), new String(v.get())); + assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow()); + assertEquals(String.format("%s_%09x", tableName, count), new String(v.get())); + } catch (AssertionError e) { + e1 = e; + } + key = new Key(k); + count++; + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + try { + assertEquals(100, count); + } catch (AssertionError e) { + e2 = e; + } + } + } + + @Override + public int run(String[] args) throws Exception { + + if (args.length != 2) { + throw new IllegalArgumentException( + "Usage : " + MRTester.class.getName() + " <table1> <table2>"); + } + + String table1 = args[0]; + String table2 = args[1]; + + Job job = Job.getInstance(getConf(), + this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + + job.setInputFormatClass(AccumuloInputFormat.class); + + AccumuloInputFormat.configure().clientProperties(getClientInfo().getProperties()) + .table(table1).table(table2).store(job); + + job.setMapperClass(TestMapper.class); + job.setMapOutputKeyClass(Key.class); + job.setMapOutputValueClass(Value.class); + job.setOutputFormatClass(NullOutputFormat.class); + + job.setNumReduceTasks(0); + + job.waitForCompletion(true); + + return job.isSuccessful() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + conf.set("mapreduce.framework.name", "local"); + conf.set("mapreduce.cluster.local.dir", + new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath()); + assertEquals(0, ToolRunner.run(conf, new MRTester(), args)); + } + } + + /** + * Generate incrementing counts and attach table name to the key/value so that order and + * multi-table data can be verified. + */ + @Test + public void testMap() throws Exception { + String[] tableNames = getUniqueNames(2); + String table1 = tableNames[0]; + String table2 = tableNames[1]; + try (AccumuloClient c = createAccumuloClient()) { + c.tableOperations().create(table1); + c.tableOperations().create(table2); + BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig()); + BatchWriter bw2 = c.createBatchWriter(table2, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation t1m = new Mutation(new Text(String.format("%s_%09x", table1, i + 1))); + t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table1, i).getBytes())); + bw.addMutation(t1m); + Mutation t2m = new Mutation(new Text(String.format("%s_%09x", table2, i + 1))); + t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", table2, i).getBytes())); + bw2.addMutation(t2m); + } + bw.close(); + bw2.close(); + + MRTester.main(new String[] {table1, table2}); + assertNull(e1); + assertNull(e2); + } + } + +} diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java index 2401477..65e1ddf 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/AccumuloInputFormatTest.java @@ -40,6 +40,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TestName; public class AccumuloInputFormatTest { @@ -49,6 +50,8 @@ public class AccumuloInputFormatTest { @Rule public TestName test = new TestName(); + @Rule + public ExpectedException exception = ExpectedException.none(); @Before public void createJob() { @@ -61,6 +64,14 @@ public class AccumuloInputFormatTest { .setupClientProperties(); } + @Test + public void testMissingTable() throws Exception { + Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest + .setupClientProperties(); + exception.expect(IllegalArgumentException.class); + AccumuloInputFormat.configure().clientProperties(clientProps).store(new JobConf()); + } + /** * Check that the iterator configuration is getting stored in the Job conf correctly. */ diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/MultiTableInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/MultiTableInputFormatTest.java new file mode 100644 index 0000000..775596a --- /dev/null +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapred/MultiTableInputFormatTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoop.mapred; + +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.IteratorSetting.Column; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.hadoop.mapreduce.InputFormatBuilder; +import org.apache.accumulo.hadoopImpl.mapreduce.InputTableConfig; +import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class MultiTableInputFormatTest { + public static final Class<AccumuloInputFormat> CLASS = AccumuloInputFormat.class; + + @Rule + public TestName testName = new TestName(); + + /** + * Verify {@link InputTableConfig} objects get correctly serialized in the JobContext. + */ + @Test + public void testStoreTables() throws Exception { + String table1Name = testName.getMethodName() + "1"; + String table2Name = testName.getMethodName() + "2"; + JobConf job = new JobConf(); + Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest + .setupClientProperties(); + List<Range> ranges = singletonList(new Range("a", "b")); + Set<IteratorSetting.Column> cols = singleton( + new IteratorSetting.Column(new Text("CF1"), new Text("CQ1"))); + IteratorSetting iter1 = new IteratorSetting(50, "iter1", "iterclass1"); + IteratorSetting iter2 = new IteratorSetting(60, "iter2", "iterclass2"); + List<IteratorSetting> allIters = new ArrayList<>(); + allIters.add(iter1); + allIters.add(iter2); + + // if auths are not set client will try to get from server, we dont want that here + Authorizations auths = Authorizations.EMPTY; + + // @formatter:off + AccumuloInputFormat.configure().clientProperties(clientProps) + .table(table1Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter1) + .addIterator(iter2).localIterators(true).offlineScan(true) // end table 1 + .table(table2Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter2) // end + .store(job); + // @formatter:on + + InputTableConfig table1 = new InputTableConfig(); + table1.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).setUseLocalIterators(true) + .setOfflineScan(true); + allIters.forEach(itr -> table1.addIterator(itr)); + InputTableConfig table2 = new InputTableConfig(); + table2.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter2); + + assertEquals(table1, InputConfigurator.getInputTableConfig(CLASS, job, table1Name)); + assertEquals(table2, InputConfigurator.getInputTableConfig(CLASS, job, table2Name)); + } + + @Test + public void testManyTables() throws Exception { + JobConf job = new JobConf(); + Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest + .setupClientProperties(); + + // if auths are not set client will try to get from server, we dont want that here + Authorizations auths = Authorizations.EMPTY; + + // set the client properties once then loop over tables + InputFormatBuilder.TableParams<JobConf> opts = AccumuloInputFormat.configure() + .clientProperties(clientProps); + for (int i = 0; i < 10_000; i++) { + List<Range> ranges = singletonList(new Range("a" + i, "b" + i)); + Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i))); + IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i); + opts.table("table" + i).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter); + } + opts.store(job); + + // verify + Map<String,InputTableConfig> configs = InputConfigurator.getInputTableConfigs(CLASS, job); + assertEquals(10_000, configs.size()); + + // create objects to test against + for (int i = 0; i < 10_000; i++) { + InputTableConfig t = new InputTableConfig(); + List<Range> ranges = singletonList(new Range("a" + i, "b" + i)); + Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i))); + IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i); + t.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter); + assertEquals(t, configs.get("table" + i)); + } + } +} diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java index 227eb84..ac22733 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/AccumuloInputFormatTest.java @@ -39,9 +39,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; public class AccumuloInputFormatTest { + @Rule + public ExpectedException exception = ExpectedException.none(); + static Properties clientProperties; @BeforeClass @@ -59,6 +64,14 @@ public class AccumuloInputFormatTest { return cp; } + @Test + public void testMissingTable() throws Exception { + Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest + .setupClientProperties(); + exception.expect(IllegalArgumentException.class); + AccumuloInputFormat.configure().clientProperties(clientProps).store(Job.getInstance()); + } + /** * Check that the iterator configuration is getting stored in the Job conf correctly. */ diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/MultiTableInputFormatTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/MultiTableInputFormatTest.java new file mode 100644 index 0000000..8cd353b --- /dev/null +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/mapreduce/MultiTableInputFormatTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.hadoop.mapreduce; + +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.IteratorSetting.Column; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.hadoopImpl.mapreduce.InputTableConfig; +import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class MultiTableInputFormatTest { + public static final Class<AccumuloInputFormat> CLASS = AccumuloInputFormat.class; + + @Rule + public TestName testName = new TestName(); + + /** + * Verify {@link InputTableConfig} objects get correctly serialized in the JobContext. + */ + @Test + public void testStoreTables() throws Exception { + String table1Name = testName.getMethodName() + "1"; + String table2Name = testName.getMethodName() + "2"; + Job job = Job.getInstance(); + Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest + .setupClientProperties(); + List<Range> ranges = singletonList(new Range("a", "b")); + Set<IteratorSetting.Column> cols = singleton( + new IteratorSetting.Column(new Text("CF1"), new Text("CQ1"))); + IteratorSetting iter1 = new IteratorSetting(50, "iter1", "iterclass1"); + IteratorSetting iter2 = new IteratorSetting(60, "iter2", "iterclass2"); + List<IteratorSetting> allIters = new ArrayList<>(); + allIters.add(iter1); + allIters.add(iter2); + + // if auths are not set client will try to get from server, we dont want that here + Authorizations auths = Authorizations.EMPTY; + + // @formatter:off + AccumuloInputFormat.configure().clientProperties(clientProps) + .table(table1Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter1) + .addIterator(iter2).localIterators(true).offlineScan(true) // end table 1 + .table(table2Name).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter2) // end + .store(job); + // @formatter:on + + InputTableConfig table1 = new InputTableConfig(); + table1.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).setUseLocalIterators(true) + .setOfflineScan(true); + allIters.forEach(itr -> table1.addIterator(itr)); + InputTableConfig table2 = new InputTableConfig(); + table2.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter2); + + Configuration jc = job.getConfiguration(); + assertEquals(table1, InputConfigurator.getInputTableConfig(CLASS, jc, table1Name)); + assertEquals(table2, InputConfigurator.getInputTableConfig(CLASS, jc, table2Name)); + } + + @Test + public void testManyTables() throws Exception { + Job job = Job.getInstance(); + Properties clientProps = org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormatTest + .setupClientProperties(); + + // if auths are not set client will try to get from server, we dont want that here + Authorizations auths = Authorizations.EMPTY; + + // set the client properties once then loop over tables + InputFormatBuilder.TableParams<Job> opts = AccumuloInputFormat.configure() + .clientProperties(clientProps); + for (int i = 0; i < 10_000; i++) { + List<Range> ranges = singletonList(new Range("a" + i, "b" + i)); + Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i))); + IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i); + opts.table("table" + i).auths(auths).ranges(ranges).fetchColumns(cols).addIterator(iter); + } + opts.store(job); + + // verify + Map<String,InputTableConfig> configs = InputConfigurator.getInputTableConfigs(CLASS, + job.getConfiguration()); + assertEquals(10_000, configs.size()); + + // create objects to test against + for (int i = 0; i < 10_000; i++) { + InputTableConfig t = new InputTableConfig(); + List<Range> ranges = singletonList(new Range("a" + i, "b" + i)); + Set<Column> cols = singleton(new Column(new Text("CF" + i), new Text("CQ" + i))); + IteratorSetting iter = new IteratorSetting(50, "iter" + i, "iterclass" + i); + t.setScanAuths(auths).setRanges(ranges).fetchColumns(cols).addIterator(iter); + assertEquals(t, configs.get("table" + i)); + } + } +} diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java index 0d25fee..74ab698 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoopImpl/mapreduce/InputTableConfigTest.java @@ -97,7 +97,7 @@ public class InputTableConfigTest { List<IteratorSetting> settings = new ArrayList<>(); settings.add(new IteratorSetting(50, "iter", "iterclass")); settings.add(new IteratorSetting(55, "iter2", "iterclass2")); - tableQueryConfig.setIterators(settings); + settings.forEach(itr -> tableQueryConfig.addIterator(itr)); byte[] serialized = serialize(tableQueryConfig); InputTableConfig actualConfig = deserialize(serialized); assertEquals(actualConfig.getIterators(), settings);