http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java new file mode 100644 index 0000000..88f9030 --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java @@ -0,0 +1,305 @@ +package mvm.rya.accumulo.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import mvm.rya.accumulo.AccumuloRdfConstants; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.indexing.accumulo.ConfigUtils; + +import java.io.IOException; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.iterators.user.AgeOffFilter; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Tool; +import org.openrdf.rio.RDFFormat; + +import com.google.common.base.Preconditions; + +/** + * Base class for MapReduce tools that interact with Accumulo-backed Rya. Holds + * a {@link Configuration} to keep track of connection parameters. + * <p> + * Can be configured to read input from Rya, either as + * {@link RyaStatementWritable}s or as Accumulo rows, or to read statements from + * RDF files. + * <p> + * Can be configured to send output either by inserting RyaStatementWritables to + * a Rya instance, or by writing arbitrary + * {@link org.apache.accumulo.core.data.Mutation}s directly to Accumulo tables. + */ +public abstract class AbstractAccumuloMRTool implements Tool { + static int DEFAULT_IO_SORT_MB = 256; + + protected Configuration conf; + + // Connection parameters + protected String zk; + protected String instance; + protected String userName; + protected String pwd; + protected Authorizations authorizations; + protected boolean mock = false; + protected boolean hdfsInput = false; + protected String ttl; + protected String tablePrefix; + protected TABLE_LAYOUT rdfTableLayout; + + /** + * Gets the Configuration containing any relevant options. + * @return This Tool's Configuration object. + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Set this Tool's Configuration. + */ + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + } + + /** + * Initializes configuration parameters, checking that required parameters + * are found and ensuring that options corresponding to multiple property + * names are set consistently. Requires at least that the username, + * password, and instance name are all configured. Zookeeper hosts must be + * configured if not using a mock instance. Table prefix, if not provided, + * will be set to {@link RdfCloudTripleStoreConstants#TBL_PRFX_DEF}. Should + * be called before configuring input/output. See {@link MRUtils} for + * configuration properties. + */ + protected void init() { + // Load configuration parameters + zk = MRUtils.getACZK(conf); + instance = MRUtils.getACInstance(conf); + userName = MRUtils.getACUserName(conf); + pwd = MRUtils.getACPwd(conf); + mock = MRUtils.getACMock(conf, false); + ttl = MRUtils.getACTtl(conf); + tablePrefix = MRUtils.getTablePrefix(conf); + rdfTableLayout = MRUtils.getTableLayout(conf, TABLE_LAYOUT.OSP); + hdfsInput = conf.getBoolean(MRUtils.AC_HDFS_INPUT_PROP, false); + // Set authorizations if specified + String authString = conf.get(MRUtils.AC_AUTH_PROP); + if (authString != null && !authString.isEmpty()) { + authorizations = new Authorizations(authString.split(",")); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency + } + else { + authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; + } + // Set table prefix to the default if not set + if (tablePrefix == null) { + tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; + MRUtils.setTablePrefix(conf, tablePrefix); + } + // Check for required configuration parameters + Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set."); + Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set."); + Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set."); + Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set."); + RdfCloudTripleStoreConstants.prefixTables(tablePrefix); + // If connecting to real accumulo, set additional parameters and require zookeepers + if (!mock) { + Preconditions.checkNotNull(zk, "Zookeeper hosts not set (" + MRUtils.AC_ZK_PROP + ")"); + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + if (conf.get(MRUtils.HADOOP_IO_SORT_MB) == null) { + conf.setInt(MRUtils.HADOOP_IO_SORT_MB, DEFAULT_IO_SORT_MB); + } + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency + } + // Ensure consistency between alternative configuration properties + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); + conf.set(ConfigUtils.CLOUDBASE_USER, userName); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix); + } + + /** + * Sets up Accumulo input for a job: the job receives + * ({@link org.apache.accumulo.core.data.Key}, + * {@link org.apache.accumulo.core.data.Value}) pairs from the table + * specified by the configuration (using + * {@link MRUtils#TABLE_PREFIX_PROPERTY} and + * {@link MRUtils#TABLE_LAYOUT_PROP}). + * @param job MapReduce Job to configure + * @throws AccumuloSecurityException if connecting to Accumulo with the + * given username and password fails. + */ + protected void setupAccumuloInput(Job job) throws AccumuloSecurityException { + // set up accumulo input + if (!hdfsInput) { + job.setInputFormatClass(AccumuloInputFormat.class); + } else { + job.setInputFormatClass(AccumuloHDFSFileInputFormat.class); + } + AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + String tableName = RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix); + AccumuloInputFormat.setInputTableName(job, tableName); + AccumuloInputFormat.setScanAuthorizations(job, authorizations); + if (mock) { + AccumuloInputFormat.setMockInstance(job, instance); + } else { + ClientConfiguration clientConfig = ClientConfiguration.loadDefault() + .withInstance(instance).withZkHosts(zk); + AccumuloInputFormat.setZooKeeperInstance(job, clientConfig); + } + if (ttl != null) { + IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class.getName()); + AgeOffFilter.setTTL(setting, Long.valueOf(ttl)); + AccumuloInputFormat.addIterator(job, setting); + } + } + + /** + * Sets up Rya input for a job: the job receives + * ({@link org.apache.hadoop.io.LongWritable}, {@link RyaStatementWritable}) + * pairs from a Rya instance. Uses the same configuration properties to + * connect as direct Accumulo input, but returns statement data instead of + * row data. + * @param job Job to configure + * @throws AccumuloSecurityException if connecting to Accumulo with the + * given username and password fails. + */ + protected void setupRyaInput(Job job) throws AccumuloSecurityException { + setupAccumuloInput(job); + job.setInputFormatClass(RyaInputFormat.class); + } + + /** + * Sets up RDF file input for a job: the job receives + * ({@link org.apache.hadoop.io.LongWritable}, {@link RyaStatementWritable}) + * pairs from RDF file(s) found at the specified path. + * @param job Job to configure + * @param inputPath File or directory name + * @param defaultFormat Default RDF serialization format, can be + * overridden by {@link MRUtils#FORMAT_PROP} + * @throws IOException if there's an error interacting with the + * {@link org.apache.hadoop.fs.FileSystem}. + */ + protected void setupFileInput(Job job, String inputPath, RDFFormat defaultFormat) throws IOException { + RDFFormat format = MRUtils.getRDFFormat(conf); + if (format == null) { + format = defaultFormat; + } + RdfFileInputFormat.addInputPath(job, new Path(inputPath)); + RdfFileInputFormat.setRDFFormat(job, format); + job.setInputFormatClass(RdfFileInputFormat.class); + } + + /** + * Sets up Accumulo output for a job: allows the job to write (String, + * Mutation) pairs, where the Mutation will be written to the table named by + * the String. + * @param job Job to configure + * @param outputTable Default table to send output to + * @throws AccumuloSecurityException if connecting to Accumulo with the + * given username and password fails + */ + protected void setupAccumuloOutput(Job job, String outputTable) throws AccumuloSecurityException { + AccumuloOutputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + AccumuloOutputFormat.setCreateTables(job, true); + AccumuloOutputFormat.setDefaultTableName(job, outputTable); + if (mock) { + AccumuloOutputFormat.setMockInstance(job, instance); + } else { + ClientConfiguration clientConfig = ClientConfiguration.loadDefault() + .withInstance(instance).withZkHosts(zk); + AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig); + } + job.setOutputFormatClass(AccumuloOutputFormat.class); + } + + /** + * Sets up Rya output for a job: allows the job to write + * {@link RyaStatementWritable} data, which will in turn be input into the + * configured Rya instance. To perform secondary indexing, use the + * configuration variables in {@link ConfigUtils}. + * @param job Job to configure + * @throws AccumuloSecurityException if connecting to Accumulo with the + * given username and password fails + */ + protected void setupRyaOutput(Job job) throws AccumuloSecurityException { + job.setOutputFormatClass(RyaOutputFormat.class); + job.setOutputValueClass(RyaStatementWritable.class); + // Specify default visibility of output rows, if given + RyaOutputFormat.setDefaultVisibility(job, conf.get(MRUtils.AC_CV_PROP)); + // Specify named graph, if given + RyaOutputFormat.setDefaultContext(job, conf.get(MRUtils.NAMED_GRAPH_PROP)); + // Set the output prefix + RyaOutputFormat.setTablePrefix(job, tablePrefix); + // Determine which indexers to use based on the config + RyaOutputFormat.setFreeTextEnabled(job, ConfigUtils.getUseFreeText(conf)); + RyaOutputFormat.setGeoEnabled(job, ConfigUtils.getUseGeo(conf)); + RyaOutputFormat.setTemporalEnabled(job, ConfigUtils.getUseTemporal(conf)); + RyaOutputFormat.setEntityEnabled(job, ConfigUtils.getUseEntity(conf)); + // Configure the Accumulo connection + AccumuloOutputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + AccumuloOutputFormat.setCreateTables(job, true); + AccumuloOutputFormat.setDefaultTableName(job, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + if (mock) { + RyaOutputFormat.setMockInstance(job, instance); + } else { + ClientConfiguration clientConfig = ClientConfiguration.loadDefault() + .withInstance(instance).withZkHosts(zk); + AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig); + } + } + + /** + * Connects to Accumulo, using the stored connection parameters. + * @return A Connector to an Accumulo instance, which could be a mock + * instance. + * @throws AccumuloException if connecting to Accumulo fails. + * @throws AccumuloSecurityException if authenticating with Accumulo fails. + */ + protected Connector getConnector() throws AccumuloSecurityException, AccumuloException { + Instance zooKeeperInstance; + if (mock) { + zooKeeperInstance = new MockInstance(instance); + } + else { + zooKeeperInstance = new ZooKeeperInstance(instance, zk); + } + return zooKeeperInstance.getConnector(userName, new PasswordToken(pwd)); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java new file mode 100644 index 0000000..90461d1 --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java @@ -0,0 +1,161 @@ +package mvm.rya.accumulo.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.rfile.RFileOperations; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.ArgumentChecker; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * {@link FileInputFormat} that finds the Accumulo tablet files on the HDFS + * disk, and uses that as the input for MapReduce jobs. + */ +public class AccumuloHDFSFileInputFormat extends FileInputFormat<Key, Value> { + + public static final Range ALLRANGE = new Range(new Text("\u0000"), new Text("\uFFFD")); + + @Override + public List<InputSplit> getSplits(JobContext jobContext) throws IOException { + //read the params from AccumuloInputFormat + Configuration conf = jobContext.getConfiguration(); + Instance instance = MRUtils.AccumuloProps.getInstance(jobContext); + String user = MRUtils.AccumuloProps.getUsername(jobContext); + AuthenticationToken password = MRUtils.AccumuloProps.getPassword(jobContext); + String table = MRUtils.AccumuloProps.getTablename(jobContext); + ArgumentChecker.notNull(instance); + ArgumentChecker.notNull(table); + + //find the files necessary + try { + Connector connector = instance.getConnector(user, password); + TableOperations tos = connector.tableOperations(); + String tableId = tos.tableIdMap().get(table); + Scanner scanner = connector.createScanner("accumulo.metadata", Authorizations.EMPTY); //TODO: auths? + scanner.setRange(new Range(new Text(tableId + "\u0000"), new Text(tableId + "\uFFFD"))); + scanner.fetchColumnFamily(new Text("file")); + List<String> files = new ArrayList<String>(); + List<InputSplit> fileSplits = new ArrayList<InputSplit>(); + for (Map.Entry<Key, Value> entry : scanner) { + String file = entry.getKey().getColumnQualifier().toString(); + Path path = new Path(file); + FileSystem fs = path.getFileSystem(conf); + FileStatus fileStatus = fs.getFileStatus(path); + long len = fileStatus.getLen(); + BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, len); + files.add(file); + fileSplits.add(new FileSplit(path, 0, len, fileBlockLocations[0].getHosts())); + } + System.out.println(files); + return fileSplits; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public RecordReader<Key, Value> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return new RecordReader<Key, Value>() { + + private FileSKVIterator fileSKVIterator; + private boolean started = false; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + FileSplit split = (FileSplit) inputSplit; + Configuration job = taskAttemptContext.getConfiguration(); + Path file = split.getPath(); + FileSystem fs = file.getFileSystem(job); + Instance instance = MRUtils.AccumuloProps.getInstance(taskAttemptContext); + + fileSKVIterator = RFileOperations.getInstance().openReader(file.toString(), ALLRANGE, + new HashSet<ByteSequence>(), false, fs, job, instance.getConfiguration()); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (started) { + fileSKVIterator.next(); + } + else { + started = true; // don't move past the first record yet + } + return fileSKVIterator.hasTop(); + } + + @Override + public Key getCurrentKey() throws IOException, InterruptedException { + return fileSKVIterator.getTopKey(); + } + + @Override + public Value getCurrentValue() throws IOException, InterruptedException { + return fileSKVIterator.getTopValue(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void close() throws IOException { + } + }; + } + + /** + * Mapper that has no effect. + */ + @SuppressWarnings("rawtypes") + public static class NullMapper extends Mapper { + @Override + protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/MRUtils.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/MRUtils.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/MRUtils.java new file mode 100644 index 0000000..409c978 --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/MRUtils.java @@ -0,0 +1,317 @@ +package mvm.rya.accumulo.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.rio.RDFFormat; + +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; + +/** + * Contains constants and static methods for interacting with a + * {@link Configuration} and handling options likely to be relevant to Rya + * MapReduce jobs. Defines constant property names associated with Accumulo and + * Rya options, and some convenience methods to get and set these properties + * with respect to a given Configuration. + */ +public class MRUtils { + /** + * Property name for the name of a MapReduce job. + */ + public static final String JOB_NAME_PROP = "mapred.job.name"; + + /** + * Property name for the Accumulo username. + */ + public static final String AC_USERNAME_PROP = "ac.username"; + /** + * Property name for the Accumulo password. + */ + public static final String AC_PWD_PROP = "ac.pwd"; + + /** + * Property name for the list of zookeepers. + */ + public static final String AC_ZK_PROP = "ac.zk"; + /** + * Property name for the Accumulo instance name. + */ + public static final String AC_INSTANCE_PROP = "ac.instance"; + /** + * Property name for whether to run against a mock Accumulo instance. + */ + public static final String AC_MOCK_PROP = "ac.mock"; + + /** + * Property name for TTL; allows using an age-off filter on Accumulo input. + */ + public static final String AC_TTL_PROP = "ac.ttl"; + + /** + * Property name for scan authorizations when reading data from Accumulo. + */ + public static final String AC_AUTH_PROP = "ac.auth"; + /** + * Property name for default visibility when writing data to Accumulo. + */ + public static final String AC_CV_PROP = "ac.cv"; + + /** + * Property name for whether to read Accumulo data directly from HDFS + * as opposed to through Accumulo itself. + */ + public static final String AC_HDFS_INPUT_PROP = "ac.hdfsinput"; + /** + * Property name for the table layout to use when reading data from Rya. + */ + public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout"; + + /** + * Property name for the Rya table prefix, identifying the Rya + * instance to work with. + */ + public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix"; + /** + * Property name for the RDF serialization format to use, when using RDF + * files. + */ + public static final String FORMAT_PROP = "rdf.format"; + /** + * Property name for a file input path, if using file input. + */ + public static final String INPUT_PATH = "input"; + /** + * Property name for specifying a default named graph to use when writing + * new statements. + */ + public static final String NAMED_GRAPH_PROP = "rdf.graph"; + + public static final String AC_TABLE_PROP = "ac.table"; + public static final String HADOOP_IO_SORT_MB = "io.sort.mb"; + public static final ValueFactory vf = new ValueFactoryImpl(); + + /** + * Gets the TTL from a given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @return The TTL that will be applied as an age-off filter for Accumulo + * input data, or null if not set. + */ + public static String getACTtl(Configuration conf) { + return conf.get(AC_TTL_PROP); + } + + /** + * Gets the username from a given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @return The configured Accumulo username, or null if not set. + */ + public static String getACUserName(Configuration conf) { + return conf.get(AC_USERNAME_PROP); + } + + /** + * Gets the password from a given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @return The configured Accumulo password, or null if not set. + */ + public static String getACPwd(Configuration conf) { + return conf.get(AC_PWD_PROP); + } + + /** + * Gets the zookeepers from a given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @return The configured zookeeper list, or null if not set. + */ + public static String getACZK(Configuration conf) { + return conf.get(AC_ZK_PROP); + } + + /** + * Gets the instance name from a given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @return The configured Accumulo instance name, or null if not set. + */ + public static String getACInstance(Configuration conf) { + return conf.get(AC_INSTANCE_PROP); + } + + /** + * Gets whether to use a mock instance from a given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @param defaultValue Default choice if the mock property hasn't been + * explicitly set in the Configuration. + * @return True if a mock instance should be used, false to connect to + * a running Accumulo. + */ + public static boolean getACMock(Configuration conf, boolean defaultValue) { + return conf.getBoolean(AC_MOCK_PROP, defaultValue); + } + + /** + * Gets the table prefix from a given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @return The configured Rya table prefix, or null if not set. + */ + public static String getTablePrefix(Configuration conf) { + return conf.get(TABLE_PREFIX_PROPERTY); + } + + /** + * Gets the table layout that determines which Rya table to scan for input. + * @param conf Configuration containing MapReduce tool options. + * @param defaultLayout The layout to use if the Configuration doesn't + * specify any layout. + * @return The configured layout to use for reading statements from Rya. + */ + public static TABLE_LAYOUT getTableLayout(Configuration conf, TABLE_LAYOUT defaultLayout) { + return TABLE_LAYOUT.valueOf(conf.get(TABLE_LAYOUT_PROP, defaultLayout.toString())); + } + + /** + * Gets the RDF serialization format to use for parsing RDF files. + * @param conf Configuration containing MapReduce tool options. + * @return The configured RDFFormat, or null if not set. + */ + public static RDFFormat getRDFFormat(Configuration conf) { + return RDFFormat.valueOf(conf.get(FORMAT_PROP)); + } + + /** + * Sets the username in the given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @param str Accumulo username, used for input and/or output. + */ + public static void setACUserName(Configuration conf, String str) { + conf.set(AC_USERNAME_PROP, str); + } + + /** + * Sets the password in the given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @param str Accumulo password string, used for input and/or output. + */ + public static void setACPwd(Configuration conf, String str) { + conf.set(AC_PWD_PROP, str); + } + + /** + * Sets the zookeepers in the given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @param str List of zookeepers to use to connect to Accumulo. + */ + public static void setACZK(Configuration conf, String str) { + conf.set(AC_ZK_PROP, str); + } + + /** + * Sets the instance in the given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @param str Accumulo instance name, for input and/or output. + */ + public static void setACInstance(Configuration conf, String str) { + conf.set(AC_INSTANCE_PROP, str); + } + + /** + * Sets the TTL in the given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @param str TTL for Accumulo data. Rows older than this won't be scanned + * as input. + */ + public static void setACTtl(Configuration conf, String str) { + conf.set(AC_TTL_PROP, str); + } + + /** + * Sets whether to connect to a mock Accumulo instance. + * @param conf Configuration containing MapReduce tool options. + * @param mock true to use a mock instance, false to attempt to connect + * to a running Accumulo instance. + */ + public static void setACMock(Configuration conf, boolean mock) { + conf.setBoolean(AC_MOCK_PROP, mock); + } + + /** + * Sets the Rya table prefix in the given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @param prefix Prefix of the Rya tables to use for input and/or output. + */ + public static void setTablePrefix(Configuration conf, String prefix) { + conf.set(TABLE_PREFIX_PROPERTY, prefix); + } + + /** + * Sets the table layout in the given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @param layout The Rya core table to scan when using Rya for input. + */ + public static void setTableLayout(Configuration conf, TABLE_LAYOUT layout) { + conf.set(TABLE_LAYOUT_PROP, layout.toString()); + } + + /** + * Sets the RDF serialization format in the given Configuration. + * @param conf Configuration containing MapReduce tool options. + * @param format The expected format of any RDF text data. + */ + public static void setRDFFormat(Configuration conf, RDFFormat format) { + conf.set(FORMAT_PROP, format.getName()); + } + + /** + * Static class for accessing properties associated with Accumulo input + * formats. Can allow input formats that don't extend + * {@link InputFormatBase} to still use the same Accumulo input + * configuration options. + */ + @SuppressWarnings("rawtypes") + public static class AccumuloProps extends InputFormatBase { + /** + * @throws UnsupportedOperationException always. This class should only be used to access properties. + */ + @Override + public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { + throw new UnsupportedOperationException("Accumulo Props just holds properties"); + } + public static Instance getInstance(JobContext conf) { + return InputFormatBase.getInstance(conf); + } + public static AuthenticationToken getPassword(JobContext conf) { + return InputFormatBase.getAuthenticationToken(conf); + } + public static String getUsername(JobContext conf) { + return InputFormatBase.getPrincipal(conf); + } + public static String getTablename(JobContext conf) { + return InputFormatBase.getInputTableName(conf); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/RdfFileInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RdfFileInputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RdfFileInputFormat.java new file mode 100644 index 0000000..208045e --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RdfFileInputFormat.java @@ -0,0 +1,443 @@ +package mvm.rya.accumulo.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.io.PipedReader; +import java.io.PipedWriter; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader; +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.rio.RDFFormat; +import org.openrdf.rio.RDFHandler; +import org.openrdf.rio.RDFHandlerException; +import org.openrdf.rio.RDFParseException; +import org.openrdf.rio.RDFParser; +import org.openrdf.rio.Rio; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaTripleContext; + +/** + * {@link FileInputFormat} that can read multiple RDF files and convert into + * statements. + * <p> + * Expects all files to use the same RDF serialization format, which must be + * provided. + * <p> + * Reading and parsing is done asynchronously, so entire files need not be + * loaded into memory at once. Reading will block when a character buffer is + * full, waiting for the parser to consume more data. The parser will block when + * a statement buffer is full, waiting for the client to consume the statements + * generated so far. This enables large files, particularly useful for N-Triples + * and N-Quads formats, which can be parsed one line at a time. The size of each + * buffer can be configured. An error will be thrown if the parser takes too + * long to respond, and this timeout can be configured. + * <p> + * Only N-Triples and N-Quads files may be split into multiple + * {@link InputSplit}s per file, if large enough. Input is read line-by-line, + * and each line of an N-Triples or N-Quads file is self-contained, so any + * arbitrary split is valid. This means the number of input splits may be + * greater than the number of input files if and only if N-Triples or N-Quads is + * given as the RDF serialization format. + */ +public class RdfFileInputFormat extends FileInputFormat<LongWritable, RyaStatementWritable> { + private static final Logger logger = Logger.getLogger(RdfFileInputFormat.class); + private static final String PREFIX = RdfFileInputFormat.class.getSimpleName(); + private static final String CHAR_BUFFER_SIZE_PROP = PREFIX + ".char.buffer.size"; + private static final String STATEMENT_BUFFER_SIZE_PROP = PREFIX + ".statement.buffer.size"; + private static final String TIMEOUT_PROP = PREFIX + ".timeout"; + private static final String FORMAT_PROP = PREFIX + ".rdf.format"; + + private static final RDFFormat DEFAULT_RDF_FORMAT = RDFFormat.RDFXML; + private static final int DEFAULT_CHAR_BUFFER_SIZE = 1024*1024; + private static final int DEFAULT_STATEMENT_BUFFER_SIZE = 1024; + private static final int DEFAULT_TIMEOUT = 20; + + static final RyaStatementWritable DONE = new RyaStatementWritable(null, null); // signals the end of input + static final RyaStatementWritable ERROR = new RyaStatementWritable(null, null); // signals some error + + /** + * Set the RDF serialization format to parse. All input files must have the + * same format. + * @param job Job to apply the setting to + * @param format Format of any and all input files + */ + public static void setRDFFormat(Job job, RDFFormat format) { + job.getConfiguration().set(FORMAT_PROP, format.getName()); + } + + /** + * Specify the size, in characters, of the input buffer: hold this many + * characters in memory before blocking file input. + */ + public static void setCharBufferSize(Job job, int size) { + job.getConfiguration().setInt(CHAR_BUFFER_SIZE_PROP, size); + } + + /** + * Specify the size, in statements, of the parser output buffer: hold this + * many Statements in memory before blocking the parser. + */ + public static void setStatementBufferSize(Job job, int size) { + job.getConfiguration().setInt(STATEMENT_BUFFER_SIZE_PROP, size); + } + + /** + * Property to specify the timeout, in seconds: + */ + public static void setTimeout(Job job, int seconds) { + job.getConfiguration().setInt(TIMEOUT_PROP, seconds); + } + + private RDFFormat getRDFFormat(JobContext context) { + String name = context.getConfiguration().get(FORMAT_PROP); + return RDFFormat.valueOf(name); + } + + /** + * Determine whether an input file can be split. If the input format is + * configured to be anything other than N-Triples or N-Quads, then the + * structure of the file is important and it cannot be split arbitrarily. + * Otherwise, default to the superclass logic to determine whether splitting + * is appropriate. + * @return true if configured to use a line-based input format and the + * superclass implementation returns true. + */ + @Override + protected boolean isSplitable(JobContext context, Path filename) { + RDFFormat rdfFormat = getRDFFormat(context); + if (RDFFormat.NTRIPLES.equals(rdfFormat) || RDFFormat.NQUADS.equals(rdfFormat)) { + return super.isSplitable(context, filename); + } + return false; + } + + /** + * Instantiate a RecordReader for a given task attempt. + * @param inputSplit Input split to handle, may refer to part or all of + * an RDF file + * @param taskAttemptContext Contains configuration options. + * @return A RecordReader that reads and parses RDF text. + */ + @Override + public RecordReader<LongWritable, RyaStatementWritable> createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) { + Configuration conf = taskAttemptContext.getConfiguration(); + RDFFormat format = getRDFFormat(taskAttemptContext); + if (format == null) { + format = DEFAULT_RDF_FORMAT; + } + int charBufferSize = conf.getInt(CHAR_BUFFER_SIZE_PROP, DEFAULT_CHAR_BUFFER_SIZE); + int statementBufferSize = conf.getInt(STATEMENT_BUFFER_SIZE_PROP, DEFAULT_STATEMENT_BUFFER_SIZE); + int timeoutSeconds = conf.getInt(TIMEOUT_PROP, DEFAULT_TIMEOUT); + return new RdfFileRecordReader(format, charBufferSize, statementBufferSize, timeoutSeconds); + } + + /** + * Reads RDF files and generates RyaStatementWritables. Reads and parses + * data in parallel, so the entire file need not be loaded at once. + */ + class RdfFileRecordReader extends RecordReader<LongWritable, RyaStatementWritable> implements RDFHandler { + private RecordReader<Text, Text> lineReader; + private final PipedWriter pipeOut; + private final PipedReader pipeIn; + private final RDFParser rdfParser; + final BlockingQueue<RyaStatementWritable> statementCache; + + private long lineCount = 0; + private long statementCount = 0; + private RyaTripleContext tripleContext; + private RyaStatementWritable nextStatement = null; + private int timeoutSeconds; + private boolean noMoreStatements = false; + + Thread readerThread; + Thread parserThread; + private Exception threadException; + + /** + * Instantiates the RecordReader. + * @param format RDF serialization format to parse. + * @param charBufferSize Number of input characters to hold in + * memory; if exceeded, wait until the parser + * thread consumes some text before proceeding + * with reading input. + * @param statementBufferSize Number of output statements to hold in + * memory; if exceeded, wait until the + * client consumes data before proceeding + * with parsing. + * @param timeoutSeconds Number of seconds to wait for the parser + * thread to provide the next statement (or + * state that there are none). If exceeded, + * abort. + */ + RdfFileRecordReader(RDFFormat format, int charBufferSize, int statementBufferSize, int timeoutSeconds) { + rdfParser = Rio.createParser(format); + rdfParser.setRDFHandler(this); + statementCache = new LinkedBlockingQueue<RyaStatementWritable>(statementBufferSize); + pipeOut = new PipedWriter(); + pipeIn = new PipedReader(charBufferSize); + this.timeoutSeconds = timeoutSeconds; + logger.info("Initializing RecordReader with parameters:"); + logger.info("\tRDF serialization format = " + format.getName()); + logger.info("\tinput buffer size = " + charBufferSize + " characters"); + logger.info("\tstatement cache size = " + statementBufferSize); + logger.info("\tparser timeout = " + timeoutSeconds + " seconds"); + } + + /** + * Starts up one thread for reading text data (via an internal line + * based RecordReader) and one thread for receiving and parsing that + * data, each blocking when their respective buffers are full. + * @param inputSplit The section of data to read + * @param taskAttemptContext Contains configuration variables + * @throws IOException if an error is encountered initializing the line + * RecordReader or piping its output to the parser thread. + * @throws InterruptedException if an error is encountered initializing + * the line RecordReader + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + Configuration conf = taskAttemptContext.getConfiguration(); + lineReader = new KeyValueLineRecordReader(conf); + lineReader.initialize(inputSplit, taskAttemptContext); + tripleContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf)); + pipeIn.connect(pipeOut); + + readerThread = new Thread(Thread.currentThread().getName() + " -- reader thread") { + @Override + public void run() { + try { + logger.info("Starting file reader"); + while (lineReader.nextKeyValue()) { + Text key = lineReader.getCurrentKey(); + Text value = lineReader.getCurrentValue(); + pipeOut.write(key.toString()); + if (value.getLength() > 0) { + pipeOut.write(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR); + pipeOut.write(value.toString()); + } + pipeOut.write('\n'); + lineCount++; + } + logger.info("Reached end of input text; read " + lineCount + " lines in total"); + } catch (IOException | InterruptedException e) { + logger.error("Error processing line " + (lineCount+1) + " of input", e); + fail(e, this); + throw new RuntimeException(e.getMessage(), e); + } + finally { + try { lineReader.close(); } catch (IOException e) { logger.warn(e); } + try { pipeOut.close(); } catch (IOException e) { logger.warn(e); } + } + } + }; + + parserThread = new Thread(Thread.currentThread().getName() + " -- parser thread") { + @Override + public void run() { + try { + logger.info("Starting parser"); + rdfParser.parse(pipeIn, ""); + } + catch (RDFHandlerException | RDFParseException | IOException e) { + logger.error(e.getMessage(), e); + fail(e, this); + throw new RuntimeException(e.getMessage(), e); + } + finally { + try { pipeIn.close(); } catch (IOException e) { logger.warn(e); } + } + } + }; + readerThread.start(); + parserThread.start(); + } + + private void fail(Exception e, Thread source) { + // Notify the main RecordReader of the error + statementCache.offer(ERROR); + threadException = e; + // Kill the reader thread if necessary + if (source != readerThread && readerThread.isAlive()) { + readerThread.interrupt(); + } + // Kill the parser thread if necessary + if (source != parserThread && parserThread.isAlive()) { + parserThread.interrupt(); + } + } + + /** + * Loads the next statement, if there is one, and returns whether there + * is one. Receives statements from the parser thread via a blocking + * queue. + * @throws InterruptedException if interrupted while waiting for a + * statement to show up in the queue. + * @throws IOException if the parser thread doesn't respond after the + * configured timeout, or if any thread reports an error. + * @return true if a valid statement was loaded, or false if there are + * no more statements in this input split. + */ + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (noMoreStatements) { + return false; + } + nextStatement = statementCache.poll(timeoutSeconds, TimeUnit.SECONDS); + if (nextStatement == null) { + throw new IOException("Parser neither sending results nor signaling end of data after " + + timeoutSeconds + " seconds."); + } + else if (nextStatement == DONE) { + logger.info("Reached end of parsed RDF; read " + statementCount + " statements in total."); + nextStatement = null; + noMoreStatements = true; + return false; + } + else if (nextStatement == ERROR) { + nextStatement = null; + noMoreStatements = true; + throw new IOException("Error detected processing input.", threadException); + } + statementCount++; + return true; + } + + /** + * Gets the current key. + * @return the number of statements read so far, or null if all input + * has been read. + */ + @Override + public LongWritable getCurrentKey() { + if (noMoreStatements) { + return null; + } + return new LongWritable(statementCount); + } + + /** + * Gets the current value. + * @return a RyaStatementWritable loaded from RDF data, or null if all + * input has been read. + */ + @Override + public RyaStatementWritable getCurrentValue() { + return nextStatement; + } + + /** + * Gets the progress of the underlying line-based Record Reader. Does + * not include any information about the progress of the parser. + * @return The proportion of text input that has been read. + * @throws IOException if thrown by the internal RecordReader. + * @throws InterruptedException if thrown by the internal RecordReader. + */ + @Override + public float getProgress() throws IOException, InterruptedException { + return lineReader.getProgress(); + } + + /** + * Closes all the underlying resources. + */ + @Override + public void close() { + if (parserThread.isAlive()) { + parserThread.interrupt(); + } + if (readerThread.isAlive()) { + readerThread.interrupt(); + } + try { lineReader.close(); } catch (IOException e) { logger.warn(e); } + try { pipeOut.close(); } catch (IOException e) { logger.warn(e); } + try { pipeIn.close(); } catch (IOException e) { logger.warn(e); } + } + + /** + * Has no effect. + */ + @Override + public void startRDF() throws RDFHandlerException { + } + + /** + * Add a dummy item to the queue to signal that there will be no more + * statements. + * @throws RDFHandlerException if interrupted while waiting for + * the blocking queue to be ready to accept the done signal. + */ + @Override + public void endRDF() throws RDFHandlerException { + logger.info("Finished parsing RDF"); + try { + statementCache.put(DONE); + } catch (InterruptedException e) { + throw new RDFHandlerException("Interrupted while waiting to add done signal to statement queue", e); + } + } + + /** + * Has no effect. + */ + @Override + public void handleNamespace(String s, String s1) throws RDFHandlerException { + } + + /** + * Convert the {@link Statement} to a {@link RyaStatement}, wrap it in a + * {@link RyaStatementWritable}, and add it to the queue. + * @throws RDFHandlerException if interrupted while waiting for the + * blocking queue to be ready to accept statement data. + */ + @Override + public void handleStatement(Statement statement) throws RDFHandlerException { + try { + statementCache.put(new RyaStatementWritable(RdfToRyaConversions.convertStatement(statement), tripleContext)); + } catch (InterruptedException e) { + throw new RDFHandlerException("Interrupted while waiting to add parsed statement to the statement queue", e); + } + } + + /** + * Has no effect. + */ + @Override + public void handleComment(String s) { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaInputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaInputFormat.java new file mode 100644 index 0000000..b6545b6 --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaInputFormat.java @@ -0,0 +1,130 @@ +package mvm.rya.accumulo.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat; +import org.apache.accumulo.core.client.mapreduce.RangeInputSplit; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +/** + * Subclass of {@link AbstractInputFormat} for reading + * {@link RyaStatementWritable}s directly from a running Rya instance. + */ +public class RyaInputFormat extends AbstractInputFormat<Text, RyaStatementWritable> { + /** + * Instantiates a RecordReader for this InputFormat and a given task and + * input split. + * @param split Defines the portion of the input this RecordReader is + * responsible for. + * @param context The context of the task. + * @return A RecordReader that can be used to fetch RyaStatementWritables. + */ + @Override + public RecordReader<Text, RyaStatementWritable> createRecordReader(InputSplit split, TaskAttemptContext context) { + return new RyaStatementRecordReader(); + } + + /** + * Sets the table layout to use. + * @param conf Configuration to set the layout in. + * @param layout Statements will be read from the Rya table associated + * with this layout. + */ + public static void setTableLayout(Job conf, TABLE_LAYOUT layout) { + conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name()); + } + + /** + * Retrieves RyaStatementWritable objects from Accumulo tables. + */ + public class RyaStatementRecordReader extends AbstractRecordReader<Text, RyaStatementWritable> { + private RyaTripleContext ryaContext; + private TABLE_LAYOUT tableLayout; + + @Override + protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, + RangeInputSplit split) { + } + + /** + * Initializes the RecordReader. + * @param inSplit Defines the portion of data to read. + * @param attempt Context for this task attempt. + * @throws IOException if thrown by the superclass's initialize method. + */ + @Override + public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { + super.initialize(inSplit, attempt); + this.tableLayout = MRUtils.getTableLayout(attempt.getConfiguration(), TABLE_LAYOUT.OSP); + //TODO verify that this is correct + this.ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(attempt.getConfiguration())); + } + + /** + * Load the next statement by converting the next Accumulo row to a + * statement, and make the new (key,value) pair available for retrieval. + * @return true if another (key,value) pair was fetched and is ready to + * be retrieved, false if there was none. + * @throws IOException if a row was loaded but could not be converted + * to a statement. + */ + @Override + public boolean nextKeyValue() throws IOException { + if (!scannerIterator.hasNext()) + return false; + Entry<Key, Value> entry = scannerIterator.next(); + ++numKeysRead; + currentKey = entry.getKey(); + try { + currentK = currentKey.getRow(); + RyaStatement stmt = this.ryaContext.deserializeTriple(this.tableLayout, + new TripleRow(entry.getKey().getRow().getBytes(), + entry.getKey().getColumnFamily().getBytes(), + entry.getKey().getColumnQualifier().getBytes(), + entry.getKey().getTimestamp(), + entry.getKey().getColumnVisibility().getBytes(), + entry.getValue().get())); + RyaStatementWritable writable = new RyaStatementWritable(); + writable.setRyaStatement(stmt); + currentV = writable; + } catch(TripleRowResolverException e) { + throw new IOException(e); + } + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java new file mode 100644 index 0000000..c9f7ffe --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java @@ -0,0 +1,597 @@ +package mvm.rya.accumulo.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.data.Mutation; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.model.vocabulary.XMLSchema; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRdfConstants; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.indexing.FreeTextIndexer; +import mvm.rya.indexing.GeoIndexer; +import mvm.rya.indexing.TemporalIndexer; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; +import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; +import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; + +/** + * {@link OutputFormat} that uses Rya, the {@link GeoIndexer}, the + * {@link FreeTextIndexer}, the {@link TemporalIndexer}, and the + * {@link EntityCentricIndex} as the sink of triple data. This + * OutputFormat ignores the Keys and only writes the Values to Rya. + * <p> + * The user must specify connection parameters for Rya, {@link GeoIndexer}, + * {@link FreeTextIndexer}, {@link TemporalIndexer}, and + * {@link EntityCentricIndex}, if secondary indexing is desired. + */ +public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable> { + private static final Logger logger = Logger.getLogger(RyaOutputFormat.class); + + private static final String PREFIX = RyaOutputFormat.class.getSimpleName(); + private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory"; + private static final String ENABLE_FREETEXT = PREFIX + ".freetext.enable"; + private static final String ENABLE_GEO = PREFIX + ".geo.enable"; + private static final String ENABLE_TEMPORAL = PREFIX + ".temporal.enable"; + private static final String ENABLE_ENTITY = PREFIX + ".entity.enable"; + private static final String ENABLE_CORE = PREFIX + ".coretables.enable"; + private static final String OUTPUT_PREFIX_PROPERTY = PREFIX + ".tablePrefix"; + private static final String CV_PROPERTY = PREFIX + ".cv.default"; + private static final String CONTEXT_PROPERTY = PREFIX + ".context"; + + /** + * Set the default visibility of output: any statement whose visibility is + * null will be written with this visibility instead. If not specified, use + * an empty authorizations list. + * @param job Job to apply the setting to. + * @param visibility A comma-separated list of authorizations. + */ + public static void setDefaultVisibility(Job job, String visibility) { + if (visibility != null) { + job.getConfiguration().set(CV_PROPERTY, visibility); + } + } + + /** + * Set the default context (named graph) for any output: any statement whose + * context is null will be written with this context instead. If not + * specified, don't write any context. + * @param job Job to apply the setting to. + * @param context A context string, should be a syntactically valid URI. + */ + public static void setDefaultContext(Job job, String context) { + if (context != null) { + job.getConfiguration().set(CONTEXT_PROPERTY, context); + } + } + + /** + * Set the table prefix for output. + * @param job Job to apply the setting to. + * @param prefix The common prefix to all rya tables that output will be written to. + */ + public static void setTablePrefix(Job job, String prefix) { + job.getConfiguration().set(OUTPUT_PREFIX_PROPERTY, prefix); + } + + /** + * Set whether the free text index is enabled. Defaults to true. + * @param job Job to apply the setting to. + * @param enable Whether this job should add its output statements to the free text index. + */ + public static void setFreeTextEnabled(Job job, boolean enable) { + job.getConfiguration().setBoolean(ENABLE_FREETEXT, enable); + } + + /** + * Set whether the geo index is enabled. Defaults to true. + * @param job Job to apply the setting to. + * @param enable Whether this job should add its output statements to the geo index. + */ + public static void setGeoEnabled(Job job, boolean enable) { + job.getConfiguration().setBoolean(ENABLE_GEO, enable); + } + + /** + * Set whether the temporal index is enabled. Defaults to true. + * @param job Job to apply the setting to. + * @param enable Whether this job should add its output statements to the temporal index. + */ + public static void setTemporalEnabled(Job job, boolean enable) { + job.getConfiguration().setBoolean(ENABLE_TEMPORAL, enable); + } + + /** + * Set whether the entity-centric index is enabled. Defaults to true. + * @param job Job to apply the setting to. + * @param enable Whether this job should add its output statements to the entity-centric index. + */ + public static void setEntityEnabled(Job job, boolean enable) { + job.getConfiguration().setBoolean(ENABLE_ENTITY, enable); + } + + /** + * Set whether to insert to the core Rya tables (spo, osp, po). Defaults to true. + * @param job Job to apply the setting to. + * @param enable Whether this job should output to the core tables. + */ + public static void setCoreTablesEnabled(Job job, boolean enable) { + job.getConfiguration().setBoolean(ENABLE_CORE, enable); + } + + /** + * Configure a job to use a mock Accumulo instance. + * @param job Job to configure + * @param instance Name of the mock instance + */ + public static void setMockInstance(Job job, String instance) { + AccumuloOutputFormat.setMockInstance(job, instance); + job.getConfiguration().setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true); + job.getConfiguration().setBoolean(MRUtils.AC_MOCK_PROP, true); + } + + /** + * Verify that all of the enabled indexers can be initialized. + * @param jobContext Context containing configuration + * @throws IOException if initializing the core Rya indexer fails. + */ + @Override + public void checkOutputSpecs(JobContext jobContext) throws IOException { + Configuration conf = jobContext.getConfiguration(); + // make sure that all of the indexers can connect + getGeoIndexer(conf); + getFreeTextIndexer(conf); + getTemporalIndexer(conf); + getRyaIndexer(conf); + } + + /** + * Get the OutputCommitter for this OutputFormat. + * @param context Context of the MapReduce task + * @return A committer whose method implementations are empty. + */ + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + // copied from AccumuloOutputFormat + return new NullOutputFormat<Text, Mutation>().getOutputCommitter(context); + } + + /** + * Get this OutputFormat's RecordWriter. + * @param context Context of the MapReduce task + * @return A RecordWriter that writes statements to Rya tables. + * @throws IOException if any enabled indexers can't be initialized + */ + @Override + public RecordWriter<Writable, RyaStatementWritable> getRecordWriter(TaskAttemptContext context) throws IOException { + return new RyaRecordWriter(context); + } + + private static GeoIndexer getGeoIndexer(Configuration conf) { + if (!conf.getBoolean(ENABLE_GEO, true)) { + return null; + } + GeoMesaGeoIndexer geo = new GeoMesaGeoIndexer(); + geo.setConf(conf); + return geo; + } + + private static FreeTextIndexer getFreeTextIndexer(Configuration conf) { + if (!conf.getBoolean(ENABLE_FREETEXT, true)) { + return null; + } + AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer(); + freeText.setConf(conf); + return freeText; + } + + private static TemporalIndexer getTemporalIndexer(Configuration conf) { + if (!conf.getBoolean(ENABLE_TEMPORAL, true)) { + return null; + } + AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer(); + temporal.setConf(conf); + return temporal; + } + + private static EntityCentricIndex getEntityIndexer(Configuration conf) { + if (!conf.getBoolean(ENABLE_ENTITY, true)) { + return null; + } + EntityCentricIndex entity = new EntityCentricIndex(); + entity.setConf(conf); + return entity; + } + + private static AccumuloRyaDAO getRyaIndexer(Configuration conf) throws IOException { + try { + if (!conf.getBoolean(ENABLE_CORE, true)) { + return null; + } + AccumuloRyaDAO ryaIndexer = new AccumuloRyaDAO(); + Connector conn = ConfigUtils.getConnector(conf); + ryaIndexer.setConnector(conn); + + AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); + + String tablePrefix = conf.get(OUTPUT_PREFIX_PROPERTY, null); + if (tablePrefix != null) { + ryaConf.setTablePrefix(tablePrefix); + } + ryaConf.setDisplayQueryPlan(false); + ryaIndexer.setConf(ryaConf); + ryaIndexer.init(); + return ryaIndexer; + } catch (AccumuloException e) { + logger.error("Cannot create RyaIndexer", e); + throw new IOException(e); + } catch (AccumuloSecurityException e) { + logger.error("Cannot create RyaIndexer", e); + throw new IOException(e); + } catch (RyaDAOException e) { + logger.error("Cannot create RyaIndexer", e); + throw new IOException(e); + } + } + + /** + * RecordWriter that takes in {@link RyaStatementWritable}s and writes them + * to Rya tables. + */ + public static class RyaRecordWriter extends RecordWriter<Writable, RyaStatementWritable> + implements Closeable, Flushable { + private static final Logger logger = Logger.getLogger(RyaRecordWriter.class); + + private FreeTextIndexer freeTextIndexer; + private GeoIndexer geoIndexer; + private TemporalIndexer temporalIndexer; + private EntityCentricIndex entityIndexer; + private AccumuloRyaDAO ryaIndexer; + private RyaTripleContext tripleContext; + private MultiTableBatchWriter writer; + private byte[] cv = AccumuloRdfConstants.EMPTY_CV.getExpression(); + private RyaURI defaultContext = null; + + private static final long ONE_MEGABYTE = 1024L * 1024L; + private static final long AVE_STATEMENT_SIZE = 100L; + + private long bufferSizeLimit; + private long bufferCurrentSize = 0; + + private ArrayList<RyaStatement> buffer; + + /** + * Constructor. + * @param context Context for MapReduce task + * @throws IOException if the core Rya indexer or entity indexer can't + * be initialized + */ + public RyaRecordWriter(TaskAttemptContext context) throws IOException { + this(context.getConfiguration()); + } + + /** + * Constructor. + * @param conf Configuration containing any relevant options. + * @throws IOException if the core Rya indexer or entity indexer can't + * be initialized + */ + public RyaRecordWriter(Configuration conf) throws IOException { + // set the visibility + String visibility = conf.get(CV_PROPERTY); + if (visibility != null) { + cv = visibility.getBytes(); + } + // set the default context + String context = conf.get(CONTEXT_PROPERTY, ""); + if (context != null && !context.isEmpty()) { + defaultContext = new RyaURI(context); + } + + // set up the buffer + bufferSizeLimit = conf.getLong(MAX_MUTATION_BUFFER_SIZE, ONE_MEGABYTE); + int bufferCapacity = (int) (bufferSizeLimit / AVE_STATEMENT_SIZE); + buffer = new ArrayList<RyaStatement>(bufferCapacity); + + // set up the indexers + freeTextIndexer = getFreeTextIndexer(conf); + geoIndexer = getGeoIndexer(conf); + temporalIndexer = getTemporalIndexer(conf); + entityIndexer = getEntityIndexer(conf); + ryaIndexer = getRyaIndexer(conf); + + // The entity index needs a batch writer -- typically it uses the DAO's, but decoupling + // them lets it be used with or without the core tables, like the other indexers. + if (entityIndexer != null) { + Connector conn; + try { + conn = ConfigUtils.getConnector(conf); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IOException("Error connecting to Accumulo for entity index output", e); + } + BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); + batchWriterConfig.setMaxMemory(RdfCloudTripleStoreConstants.MAX_MEMORY); + batchWriterConfig.setTimeout(RdfCloudTripleStoreConstants.MAX_TIME, TimeUnit.MILLISECONDS); + batchWriterConfig.setMaxWriteThreads(RdfCloudTripleStoreConstants.NUM_THREADS); + writer = conn.createMultiTableBatchWriter(batchWriterConfig); + entityIndexer.setMultiTableBatchWriter(writer); + } + + // update fields used for metrics + startTime = System.currentTimeMillis(); + lastCommitFinishTime = startTime; + + // set up the triple context + tripleContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf)); + } + + /** + * Write any buffered statements to Accumulo. + * @throws IOException if any indexer can't be flushed. + */ + @Override + public void flush() throws IOException { + flushBuffer(); + } + + /** + * Close all indexers. + */ + @Override + public void close() { + close(null); + } + + /** + * Close all indexers. + * @param paramTaskAttemptContext Unused. + */ + @Override + public void close(TaskAttemptContext paramTaskAttemptContext) { + // close everything. log errors + try { + flush(); + } catch (IOException e) { + logger.error("Error flushing the buffer on RyaOutputFormat Close", e); + } + try { + if (geoIndexer != null) + geoIndexer.close(); + } catch (IOException e) { + logger.error("Error closing the geoIndexer on RyaOutputFormat Close", e); + } + try { + if (freeTextIndexer != null) + freeTextIndexer.close(); + } catch (IOException e) { + logger.error("Error closing the freetextIndexer on RyaOutputFormat Close", e); + } + try { + if (temporalIndexer != null) + temporalIndexer.close(); + } catch (IOException e) { + logger.error("Error closing the temporalIndexer on RyaOutputFormat Close", e); + } + try { + if (entityIndexer != null) + entityIndexer.close(); + } catch (IOException e) { + logger.error("Error closing the entityIndexer on RyaOutputFormat Close", e); + } + try { + if (ryaIndexer != null) + ryaIndexer.destroy(); + } catch (RyaDAOException e) { + logger.error("Error closing RyaDAO on RyaOutputFormat Close", e); + } + if (writer != null) { + try { + writer.close(); + } catch (MutationsRejectedException e) { + logger.error("Error closing MultiTableBatchWriter on RyaOutputFormat Close", e); + } + } + } + + /** + * Write a {@link Statement} to Rya. Adds the statement to a buffer, and + * flushes the statement buffer to the database if full. + * @param statement Statement to insert to Rya. + * @throws IOException if writing to Accumulo fails. + */ + public void write(Statement statement) throws IOException { + write(RdfToRyaConversions.convertStatement(statement)); + } + + /** + * Writes a RyaStatement to Rya. Adds the statement to a buffer, and + * flushes the statement buffer to the database if full. + * @param ryaStatement Statement to insert to Rya. + * @throws IOException if writing to Accumulo fails. + */ + public void write(RyaStatement ryaStatement) throws IOException { + write(NullWritable.get(), new RyaStatementWritable(ryaStatement, tripleContext)); + } + + /** + * Writes a (key,value) pair to Rya. Adds the statement to a buffer, and + * flushes the statement buffer to the database if full. + * @param key Arbitrary Writable, not used. + * @param value Contains statement to insert to Rya. + * @throws IOException if writing to Accumulo fails. + */ + @Override + public void write(Writable key, RyaStatementWritable value) throws IOException { + RyaStatement ryaStatement = value.getRyaStatement(); + if (ryaStatement.getColumnVisibility() == null) { + ryaStatement.setColumnVisibility(cv); + } + if (ryaStatement.getContext() == null) { + ryaStatement.setContext(defaultContext); + } + buffer.add(ryaStatement); + bufferCurrentSize += statementSize(ryaStatement); + if (bufferCurrentSize >= bufferSizeLimit) { + flushBuffer(); + } + } + + private int statementSize(RyaStatement ryaStatement) { + RyaURI subject = ryaStatement.getSubject(); + RyaURI predicate = ryaStatement.getPredicate(); + RyaType object = ryaStatement.getObject(); + RyaURI context = ryaStatement.getContext(); + int size = 3 + subject.getData().length() + predicate.getData().length() + object.getData().length(); + if (!XMLSchema.ANYURI.equals(object.getDataType())) { + size += 2 + object.getDataType().toString().length(); + } + if (context != null) { + size += context.getData().length(); + } + return size; + } + + // fields for storing metrics + private long startTime = 0; + private long lastCommitFinishTime = 0; + private long totalCommitRecords = 0; + + private double totalReadDuration = 0; + private double totalWriteDuration = 0; + + private long commitCount = 0; + + private void flushBuffer() throws IOException { + totalCommitRecords += buffer.size(); + commitCount++; + + long startCommitTime = System.currentTimeMillis(); + + logger.info(String.format("(C-%d) Flushing buffer with %,d objects and %,d bytes", commitCount, buffer.size(), + bufferCurrentSize)); + + double readingDuration = (startCommitTime - lastCommitFinishTime) / 1000.; + totalReadDuration += readingDuration; + double currentReadRate = buffer.size() / readingDuration; + double totalReadRate = totalCommitRecords / totalReadDuration; + + // Print "reading" metrics + logger.info(String.format("(C-%d) (Reading) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, readingDuration, + currentReadRate, totalReadRate)); + + // write to geo + if (geoIndexer != null) { + geoIndexer.storeStatements(buffer); + geoIndexer.flush(); + } + + // write to free text + if (freeTextIndexer != null) { + freeTextIndexer.storeStatements(buffer); + freeTextIndexer.flush(); + } + + // write to temporal + if (temporalIndexer != null) { + temporalIndexer.storeStatements(buffer); + temporalIndexer.flush(); + } + + // write to entity + if (entityIndexer != null && writer != null) { + entityIndexer.storeStatements(buffer); + try { + writer.flush(); + } catch (MutationsRejectedException e) { + throw new IOException("Error flushing data to Accumulo for entity indexing", e); + } + } + + // write to rya + try { + if (ryaIndexer != null) { + ryaIndexer.add(buffer.iterator()); + } + } catch (RyaDAOException e) { + logger.error("Cannot write statement to Rya", e); + throw new IOException(e); + } + + lastCommitFinishTime = System.currentTimeMillis(); + + double writingDuration = (lastCommitFinishTime - startCommitTime) / 1000.; + totalWriteDuration += writingDuration; + double currentWriteRate = buffer.size() / writingDuration; + double totalWriteRate = totalCommitRecords / totalWriteDuration; + + // Print "writing" stats + logger.info(String.format("(C-%d) (Writing) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, writingDuration, + currentWriteRate, totalWriteRate)); + + double processDuration = writingDuration + readingDuration; + double totalProcessDuration = totalWriteDuration + totalReadDuration; + double currentProcessRate = buffer.size() / processDuration; + double totalProcessRate = totalCommitRecords / (totalProcessDuration); + + // Print "total" stats + logger.info(String.format("(C-%d) (Total) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, processDuration, + currentProcessRate, totalProcessRate)); + + // clear the buffer + buffer.clear(); + bufferCurrentSize = 0L; + } + } +}
