RYA-142 Fixed Integration Tests for Fluo Update

Closes #104, closes #100


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/2139edb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/2139edb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/2139edb5

Branch: refs/heads/master
Commit: 2139edb5244c42f11c9440eced2fd0773846f856
Parents: 177c80a
Author: Caleb Meier <caleb.me...@parsons.com>
Authored: Wed Oct 12 12:55:27 2016 -0700
Committer: Aaron Mihalik <miha...@alum.mit.edu>
Committed: Thu Oct 13 12:57:09 2016 -0400

----------------------------------------------------------------------
 .../mvm/rya/api/client/accumulo/FluoITBase.java | 33 +++++-----
 .../apache/rya/indexing/pcj/fluo/ITBase.java    | 64 +++++++++-----------
 .../pcj/fluo/api/CountStatementsIT.java         |  6 +-
 .../pcj/fluo/integration/CreateDeleteIT.java    | 26 +++-----
 .../RyaInputIncrementalUpdateIT.java            |  3 +-
 5 files changed, 61 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java 
b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
index 0dcc04c..cc50b90 100644
--- a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
+++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java
@@ -55,14 +55,14 @@ import org.openrdf.sail.SailException;
 
 import com.google.common.io.Files;
 
-import io.fluo.api.client.FluoAdmin;
-import io.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import io.fluo.api.client.FluoAdmin.TableExistsException;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.api.mini.MiniFluo;
+import org.apache.fluo.api.client.FluoAdmin;
+import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.mini.MiniFluo;
 import mvm.rya.accumulo.AccumuloRdfConfiguration;
 import mvm.rya.api.client.RyaClientException;
 import mvm.rya.api.client.Install;
@@ -224,17 +224,16 @@ public abstract class FluoITBase {
      */
     protected MiniFluo startMiniFluo() throws AlreadyInitializedException, 
TableExistsException {
         // Setup the observers that will be used by the Fluo PCJ Application.
-        final List<ObserverConfiguration> observers = new ArrayList<>();
-        observers.add(new 
ObserverConfiguration(TripleObserver.class.getName()));
-        observers.add(new 
ObserverConfiguration(StatementPatternObserver.class.getName()));
-        observers.add(new ObserverConfiguration(JoinObserver.class.getName()));
-        observers.add(new 
ObserverConfiguration(FilterObserver.class.getName()));
+        final List<ObserverSpecification> observers = new ArrayList<>();
+        observers.add(new 
ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(FilterObserver.class.getName()));
 
         // Provide export parameters child test classes may provide to the
         // export observer.
-        final ObserverConfiguration exportObserverConfig = new 
ObserverConfiguration(
-                QueryResultObserver.class.getName());
-        exportObserverConfig.setParameters(makeExportParams());
+        final ObserverSpecification exportObserverConfig = new 
ObserverSpecification(
+                QueryResultObserver.class.getName(), makeExportParams());
         observers.add(exportObserverConfig);
 
         // Configure how the mini fluo will run.
@@ -252,7 +251,7 @@ public abstract class FluoITBase {
         config.addObservers(observers);
 
         FluoFactory.newAdmin(config).initialize(
-                new 
FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
+                new 
FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
         return FluoFactory.newMiniFluo(config);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
index 9e2b9c6..a5288ec 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
@@ -25,8 +25,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -37,6 +37,20 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.fluo.api.client.FluoAdmin;
+import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.mini.MiniFluo;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
@@ -63,19 +77,7 @@ import org.openrdf.repository.RepositoryConnection;
 import org.openrdf.sail.Sail;
 
 import com.google.common.io.Files;
-import org.apache.fluo.api.client.FluoAdmin;
-import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
-import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
-import org.apache.fluo.api.mini.MiniFluo;
+
 import mvm.rya.accumulo.AccumuloRdfConfiguration;
 import mvm.rya.api.client.Install.InstallConfiguration;
 import mvm.rya.api.client.RyaClient;
@@ -283,17 +285,13 @@ public abstract class ITBase {
             final QueryMetadata queryMetadata = new 
FluoQueryMetadataDAO().readQueryMetadata(snapshot, queryId);
             final VariableOrder varOrder = queryMetadata.getVariableOrder();
 
-            // Fetch the Binding Sets for the query.
-            final ScannerConfiguration scanConfig = new ScannerConfiguration();
-            
scanConfig.fetchColumn(FluoQueryColumns.QUERY_BINDING_SET.getFamily(),
-                    FluoQueryColumns.QUERY_BINDING_SET.getQualifier());
-
+            CellScanner cellScanner = 
snapshot.scanner().fetch(FluoQueryColumns.QUERY_BINDING_SET).build();
             final BindingSetStringConverter converter = new 
BindingSetStringConverter();
 
-            final RowIterator rowIter = snapshot.get(scanConfig);
-            while (rowIter.hasNext()) {
-                final Entry<Bytes, ColumnIterator> row = rowIter.next();
-                final String bindingSetString = 
row.getValue().next().getValue().toString();
+           Iterator<RowColumnValue> iter = cellScanner.iterator();
+            
+            while (iter.hasNext()) {
+               final String bindingSetString = iter.next().getsValue();
                 final BindingSet bindingSet = 
converter.convert(bindingSetString, varOrder);
                 bindingSets.add(bindingSet);
             }
@@ -378,14 +376,11 @@ public abstract class ITBase {
      */
     protected MiniFluo startMiniFluo() throws AlreadyInitializedException, 
TableExistsException {
         // Setup the observers that will be used by the Fluo PCJ Application.
-        final List<ObserverConfiguration> observers = new ArrayList<>();
-        observers.add(new 
ObserverConfiguration(TripleObserver.class.getName()));
-        observers.add(new 
ObserverConfiguration(StatementPatternObserver.class.getName()));
-        observers.add(new ObserverConfiguration(JoinObserver.class.getName()));
-        observers.add(new 
ObserverConfiguration(FilterObserver.class.getName()));
-
-        // Configure the export observer to export new PCJ results to the mini 
accumulo cluster.
-        final ObserverConfiguration exportObserverConfig = new 
ObserverConfiguration(QueryResultObserver.class.getName());
+        final List<ObserverSpecification> observers = new ArrayList<>();
+        observers.add(new 
ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(FilterObserver.class.getName()));
 
         final HashMap<String, String> exportParams = new HashMap<>();
         final RyaExportParameters ryaParams = new 
RyaExportParameters(exportParams);
@@ -395,8 +390,9 @@ public abstract class ITBase {
         ryaParams.setZookeeperServers(zookeepers);
         ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
         ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
-
-        exportObserverConfig.setParameters(exportParams);
+        
+        // Configure the export observer to export new PCJ results to the mini 
accumulo cluster.
+        final ObserverSpecification exportObserverConfig = new 
ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
         observers.add(exportObserverConfig);
 
         // Configure how the mini fluo will run.
@@ -414,7 +410,7 @@ public abstract class ITBase {
         config.addObservers(observers);
 
         FluoFactory.newAdmin(config).initialize(
-                       new 
FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
+                       new 
FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
         return FluoFactory.newMiniFluo(config);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
index 630b86d..f7d4ee1 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java
@@ -34,7 +34,7 @@ import 
org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
 import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.mini.MiniFluo;
 import mvm.rya.api.domain.RyaStatement;
 import mvm.rya.api.domain.RyaURI;
@@ -53,7 +53,7 @@ public class CountStatementsIT extends ITBase {
     @Override
     protected MiniFluo startMiniFluo() throws AlreadyInitializedException, 
TableExistsException {
         // Setup the observers that will be used by the Fluo PCJ Application.
-        final List<ObserverConfiguration> observers = new ArrayList<>();
+        final List<ObserverSpecification> observers = new ArrayList<>();
 
         // Configure how the mini fluo will run.
         final FluoConfiguration config = new FluoConfiguration();
@@ -70,7 +70,7 @@ public class CountStatementsIT extends ITBase {
         config.addObservers(observers);
 
         FluoFactory.newAdmin(config).initialize(
-                new 
FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) );
+                new 
FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
         final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
         return miniFluo;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 0a1852b..d2ff98c 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -23,9 +23,14 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Span;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
@@ -39,14 +44,6 @@ import org.openrdf.query.impl.BindingImpl;
 
 import com.google.common.collect.Sets;
 
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.config.ScannerConfiguration;
-import io.fluo.api.data.Bytes;
-import io.fluo.api.data.Span;
-import io.fluo.api.iterator.ColumnIterator;
-import io.fluo.api.iterator.RowIterator;
-
 public class CreateDeleteIT extends ITBase {
 
     /**
@@ -109,15 +106,12 @@ public class CreateDeleteIT extends ITBase {
     private List<Bytes> getFluoTableEntries(FluoClient fluoClient) {
         try (Snapshot snapshot = fluoClient.newSnapshot()) {
             List<Bytes> rows = new ArrayList<>();
+            RowScanner rscanner = 
snapshot.scanner().over(Span.prefix("")).byRow().build();
 
-            ScannerConfiguration sc1 = new ScannerConfiguration();
-            sc1.setSpan(Span.prefix(""));
-            RowIterator iterator = snapshot.get(sc1);
-
-            while (iterator.hasNext()) {
-                Entry<Bytes, ColumnIterator> row = iterator.next();
-                rows.add(row.getKey());
+            for(ColumnScanner cscanner: rscanner) {
+               rows.add(cscanner.getRow());
             }
+            
             return rows;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2139edb5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
index bc83eed..82f568e 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
@@ -101,7 +102,7 @@ public class RyaInputIncrementalUpdateIT extends ITBase {
         }
 
         fluo.waitForObservers();
-
+        
         final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, 
sparql);
         assertEquals(expected, results);
     }

Reply via email to