Repository: incubator-rya Updated Branches: refs/heads/master c03c8bbef -> 86c866eda
RYA-266 Added init() and other calls where ever the Accumulo temporal indexer is created and used for storing. Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/7dd5913a Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/7dd5913a Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/7dd5913a Branch: refs/heads/master Commit: 7dd5913acd29addfa3e55480d6b2dfe77824c205 Parents: c03c8bb Author: David W. Lotts <[email protected]> Authored: Fri Mar 31 14:05:07 2017 -0400 Committer: David Lotts <[email protected]> Committed: Thu Sep 21 12:38:56 2017 -0400 ---------------------------------------------------------------------- .../temporal/AccumuloTemporalIndexer.java | 84 ++++++++++++-------- .../temporal/AccumuloTemporalIndexerTest.java | 32 +++++--- .../apache/rya/accumulo/mr/RyaOutputFormat.java | 17 +++- .../rya/accumulo/mr/RyaOutputFormatTest.java | 25 +++--- 4 files changed, 102 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7dd5913a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java index fcc1c58..042d938 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java @@ -56,6 +56,7 @@ import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer; import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.client.RyaClientException; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.resolver.RyaToRdfConversions; import org.apache.rya.indexing.KeyParts; @@ -102,17 +103,42 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements private boolean isInit = false; + /** + * intilize the temporal index. + * This is dependent on a few set method calls before init: + * > Connector = ConfigUtils.getConnector(conf); + * > MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); + * > // optional: temporal.setConnector(connector); + * > temporal.setMultiTableBatchWriter(mtbw); + * > temporal.init(); + */ + @Override + public void init() { + if (!isInit) { + try { + initInternal(); + isInit = true; + } catch (final AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException | RyaClientException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } + } + } private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, - TableExistsException { + TableExistsException, RyaClientException { + if (mtbw == null) + throw new RyaClientException("Failed to initialize temporal index, setMultiTableBatchWriter() was not set."); + if (conf == null) + throw new RyaClientException("Failed to initialize temporal index, setConf() was not set."); + temporalIndexTableName = getTableName(); // Create one index table on first run. - ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName); - - mtbw = ConfigUtils.createMultitableBatchWriter(conf); - + Boolean isCreated = ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName); + if (isCreated) { + logger.info("First run, created temporal index table: " + temporalIndexTableName); + } temporalIndexBatchWriter = mtbw.getBatchWriter(temporalIndexTableName); - validPredicates = ConfigUtils.getTemporalPredicates(conf); } @@ -120,24 +146,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements @Override public void setConf(final Configuration conf) { this.conf = conf; - if (!isInit) { - try { - initInternal(); - isInit = true; - } catch (final AccumuloException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (final AccumuloSecurityException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (final TableNotFoundException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (final TableExistsException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } - } + } @Override @@ -158,16 +167,20 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements private void storeStatement(final Statement statement) throws IOException, IllegalArgumentException { // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list - final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + final boolean isValidPredicate = validPredicates == null || validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); if (!isValidPredicate || !(statement.getObject() instanceof Literal)) { return; } + final DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval extractDateTime(statement, indexDateTimes); if (indexDateTimes[0]==null) { return; } + if (!this.isInit) + throw new RuntimeException("Method .init() was not called (or failed) before attempting to store statements."); + // Add this as an instant, or interval. try { if (indexDateTimes[1] != null) { @@ -865,8 +878,9 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements @Override public void close() throws IOException { try { - - mtbw.close(); + if (mtbw != null) { + mtbw.close(); + } } catch (final MutationsRejectedException e) { final String msg = "Error while closing the batch writer."; @@ -926,12 +940,6 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements } @Override - public void init() { - // TODO Auto-generated method stub - - } - - @Override public void setConnector(final Connector connector) { // TODO Auto-generated method stub @@ -954,4 +962,14 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements // TODO Auto-generated method stub } + + /* + * (non-Javadoc) + * + * @see org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer#setMultiTableBatchWriter(org.apache.accumulo.core.client.MultiTableBatchWriter) + */ + @Override + public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException { + mtbw = writer; + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7dd5913a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java index 80824b8..839ef6a 100644 --- a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java +++ b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java @@ -21,8 +21,10 @@ package org.apache.rya.indexing.accumulo.temporal; import static org.apache.rya.api.resolver.RdfToRyaConversions.convertStatement; -import static org.junit.Assert.*; -import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.io.PrintStream; import java.security.NoSuchAlgorithmException; @@ -35,8 +37,12 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; @@ -48,8 +54,17 @@ import org.apache.commons.codec.binary.StringUtils; import org.apache.commons.io.output.NullOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.indexing.StatementSerializer; +import org.apache.rya.indexing.TemporalInstant; +import org.apache.rya.indexing.TemporalInstantRfc3339; +import org.apache.rya.indexing.TemporalInterval; +import org.apache.rya.indexing.accumulo.ConfigUtils; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -64,14 +79,6 @@ import org.openrdf.query.QueryEvaluationException; import com.beust.jcommander.internal.Lists; import info.aduna.iteration.CloseableIteration; -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.indexing.StatementConstraints; -import org.apache.rya.indexing.StatementSerializer; -import org.apache.rya.indexing.TemporalInstant; -import org.apache.rya.indexing.TemporalInstantRfc3339; -import org.apache.rya.indexing.TemporalInterval; -import org.apache.rya.indexing.accumulo.ConfigUtils; /** * JUnit tests for TemporalIndexer and it's implementation AccumuloTemporalIndexer @@ -230,6 +237,11 @@ public final class AccumuloTemporalIndexerTest { tIndexer = new AccumuloTemporalIndexer(); tIndexer.setConf(conf); + Connector connector = ConfigUtils.getConnector(conf); + MultiTableBatchWriter mt_bw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); + tIndexer.setConnector(connector); + tIndexer.setMultiTableBatchWriter(mt_bw); + tIndexer.init(); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7dd5913a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java index 155d694..1336364 100644 --- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java +++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java @@ -45,9 +45,6 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.log4j.Logger; -import org.openrdf.model.Statement; -import org.openrdf.model.vocabulary.XMLSchema; - import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.AccumuloRdfConstants; import org.apache.rya.accumulo.AccumuloRyaDAO; @@ -64,6 +61,8 @@ import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex; import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import org.openrdf.model.Statement; +import org.openrdf.model.vocabulary.XMLSchema; /** * {@link OutputFormat} that uses Rya, the {@link GeoIndexer}, the @@ -216,12 +215,22 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable return freeText; } - private static TemporalIndexer getTemporalIndexer(Configuration conf) { + private static TemporalIndexer getTemporalIndexer(Configuration conf) throws IOException { if (!conf.getBoolean(ENABLE_TEMPORAL, true)) { return null; } AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer(); temporal.setConf(conf); + Connector connector; + try { + connector = ConfigUtils.getConnector(conf); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IOException("Error when attempting to create a connection for writing the temporal index.", e); + } + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); + temporal.setConnector(connector); + temporal.setMultiTableBatchWriter(mtbw); + temporal.init(); return temporal; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7dd5913a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java index 0b44a92..96f57f6 100644 --- a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java +++ b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java @@ -5,7 +5,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.client.mock.MockInstance; @@ -17,15 +19,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.openrdf.model.Statement; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.model.vocabulary.XMLSchema; - -import info.aduna.iteration.CloseableIteration; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.api.domain.RyaStatement; @@ -43,6 +36,15 @@ import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; import org.apache.rya.indexing.accumulo.freetext.SimpleTokenizer; import org.apache.rya.indexing.accumulo.freetext.Tokenizer; import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; + +import info.aduna.iteration.CloseableIteration; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -235,6 +237,11 @@ public class RyaOutputFormatTest { } final AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer(); temporal.setConf(conf); + Connector connector = ConfigUtils.getConnector(conf); + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); + temporal.setConnector(connector); + temporal.setMultiTableBatchWriter(mtbw); + temporal.init(); final Set<Statement> empty = new HashSet<>(); final Set<Statement> head = new HashSet<>(); final Set<Statement> tail = new HashSet<>();
