http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java new file mode 100644 index 0000000..e095309 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import static org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.Value; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.hash.Hashing; + +/** + * This class adds or removes a hash to or from the rowId for sharding purposes. When creating a sharded row key, it + * takes the form: node_prefix:shardId:nodeId//Binding_values. For example, the row key generated from nodeId = SP_123, + * varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) + * indicates the shard id hash generated from the Binding value "uri:Bob". + * + */ +public class BindingHashShardingFunction { + + private static final BindingSetStringConverter BS_CONVERTER = new BindingSetStringConverter(); + private static final int HASH_LEN = 4; + + /** + * Generates a sharded rowId. The rowId is of the form: node_prefix:shardId:nodeId//Binding_values. For + * example, the row key generated from nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be + * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the + * Binding value "uri:Bob". + * + * @param nodeId - Node Id with type and UUID + * @param varOrder - VarOrder used to order BindingSet values + * @param bs - BindingSet with partially formed query values + * @return - serialized Bytes rowId for storing BindingSet results in Fluo + */ + public static Bytes addShard(String nodeId, VariableOrder varOrder, VisibilityBindingSet bs) { + checkNotNull(nodeId); + checkNotNull(varOrder); + checkNotNull(bs); + String[] rowPrefixAndId = nodeId.split("_"); + Preconditions.checkArgument(rowPrefixAndId.length == 2); + String prefix = rowPrefixAndId[0]; + String id = rowPrefixAndId[1]; + + String firstBindingString = ""; + Bytes rowSuffix = Bytes.of(id); + if (varOrder.getVariableOrders().size() > 0) { + VariableOrder first = new VariableOrder(varOrder.getVariableOrders().get(0)); + firstBindingString = BS_CONVERTER.convert(bs, first); + rowSuffix = RowKeyUtil.makeRowKey(id, varOrder, bs); + } + + BytesBuilder builder = Bytes.builder(); + builder.append(Bytes.of(prefix + ":")); + builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + firstBindingString))); + builder.append(":"); + builder.append(rowSuffix); + return builder.toBytes(); + } + + /** + * Generates a sharded rowId. The rowId is of the form: node_prefix:shardId:nodeId//Binding_values. For + * example, the row key generated from nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be + * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the + * Binding value "uri:Bob". + * + * @param nodeId - Node Id with type and UUID + * @param firstBsVal - String representation of the first BsValue + * @return - serialized Bytes prefix for scanning rows + */ + public static Bytes getShardedScanPrefix(String nodeId, Value firstBsVal) { + checkNotNull(nodeId); + checkNotNull(firstBsVal); + + final RyaType ryaValue = RdfToRyaConversions.convertValue(firstBsVal); + final String bindingString = ryaValue.getData() + TYPE_DELIM + ryaValue.getDataType(); + + return getShardedScanPrefix(nodeId, bindingString); + } + + /** + * Generates a sharded rowId from the indicated nodeId and bindingString. For example, the row key generated from + * nodeId = SP_123, varOrder = a;b, bs = [a = uri:Bob, b = uri:Doug] would be + * SP:HASH(uri:Bob):123//uri:Bob;uri:Doug, where HASH(uri:Bob) indicates the shard id hash generated from the + * Binding value "uri:Bob". + * + * @param nodeId - NodeId with tyep and UUID + * @param bindingString - String representation of BindingSet values, as formed by {@link BindingSetStringConverter} + * @return - serialized, sharded Bytes prefix + */ + public static Bytes getShardedScanPrefix(String nodeId, String bindingString) { + checkNotNull(nodeId); + checkNotNull(bindingString); + String[] rowPrefixAndId = nodeId.split("_"); + Preconditions.checkArgument(rowPrefixAndId.length == 2); + String prefix = rowPrefixAndId[0]; + String id = rowPrefixAndId[1]; + + BytesBuilder builder = Bytes.builder(); + builder.append(prefix + ":"); + builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + bindingString))); + builder.append(":"); + builder.append(id); + builder.append(NODEID_BS_DELIM); + builder.append(bindingString); + return builder.toBytes(); + } + + private static boolean hasHash(Bytes prefixBytes, Bytes row) { + for (int i = prefixBytes.length() + 1; i < prefixBytes.length() + HASH_LEN; i++) { + byte b = row.byteAt(i); + boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9'); + if (!isAlphaNum) { + return false; + } + } + + if (row.byteAt(prefixBytes.length()) != ':' || row.byteAt(prefixBytes.length() + HASH_LEN + 1) != ':') { + return false; + } + + return true; + } + + /** + * @return Returns input with prefix and hash stripped from beginning. + */ + public static Bytes removeHash(Bytes prefixBytes, Bytes row) { + checkNotNull(prefixBytes); + checkNotNull(row); + checkArgument(row.length() >= prefixBytes.length() + 6, "Row is shorter than expected " + row); + checkArgument(row.subSequence(0, prefixBytes.length()).equals(prefixBytes), + "Row does not have expected prefix " + row); + checkArgument(hasHash(prefixBytes, row), "Row does not have expected hash " + row); + + BytesBuilder builder = Bytes.builder(); + builder.append(prefixBytes); + builder.append("_"); + builder.append(row.subSequence(prefixBytes.length() + 6, row.length())); + return builder.toBytes(); + } + + private static String genHash(Bytes row) { + int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt(); + hash = hash & 0x7fffffff; + // base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is + // nice for debugging. + String hashString = Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0'); + hashString = hashString.substring(hashString.length() - HASH_LEN); + return hashString; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java new file mode 100644 index 0000000..d451d28 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TRIPLE_PREFIX; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaStatement; + +/** + * This class is a utility class for adding and removing the Triple prefix to + * Statements ingested into the Rya Fluo application. The Triple prefix is added + * to supported range based compactions for removing transient data from the Fluo + * application. This prefix supports the Transient data recipe described on the + * the Fluo website, and reduces the computational load on the system by cleaning up + * old deleted Triples and notifications using a targeted range compaction. + * + */ +public class TriplePrefixUtils { + + private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of(TRIPLE_PREFIX + NODEID_BS_DELIM); + + /** + * Prepends the triple prefix to the provided bytes and returns the new value as a {@link Bytes}. + * @param tripleBytes - serialized {@link RyaStatement} + * @return - serialized RyaStatement with prepended triple prefix, converted to Bytes + */ + public static Bytes addTriplePrefixAndConvertToBytes(byte[] tripleBytes) { + checkNotNull(tripleBytes); + BytesBuilder builder = Bytes.builder(); + return builder.append(TRIPLE_PREFIX_BYTES).append(tripleBytes).toBytes(); + } + + /** + * Removes the triple prefix and returns the new value as a byte array. + * @param prefixedTriple - serialized RyaStatement with prepended triple prefix, converted to Bytes + * @return - serialized {@link RyaStatement} in byte array form + */ + public static byte[] removeTriplePrefixAndConvertToByteArray(Bytes prefixedTriple) { + checkNotNull(prefixedTriple); + return prefixedTriple.subSequence(TRIPLE_PREFIX_BYTES.length(), prefixedTriple.length()).toArray(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java index 3df3708..f0faf1f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCacheTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.rya.indexing.pcj.fluo.app.query; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java new file mode 100644 index 0000000..657b548 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; +import static org.mockito.Mockito.when; + +import java.util.Set; + +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.collect.Sets; + +public class StatementPatternIdCacheTest { + + @Test + public void testCache() { + Transaction mockTx = Mockito.mock(Transaction.class); + String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN); + when(mockTx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS)).thenReturn(Bytes.of(nodeId)); + when(mockTx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH)).thenReturn(Bytes.of("123")); + + StatementPatternIdCache cache = new StatementPatternIdCache(); + Set<String> ids1 = cache.getStatementPatternIds(mockTx); + Set<String> ids2 = cache.getStatementPatternIds(mockTx); + + Assert.assertEquals(ids1, ids2); + Assert.assertEquals(Sets.newHashSet(nodeId), ids1); + + Mockito.verify(mockTx, Mockito.times(1)).get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java new file mode 100644 index 0000000..729cdeb --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunctionTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; + +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.junit.Assert; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +public class BindingHashShardingFunctionTest { + + private static final ValueFactory vf = new ValueFactoryImpl(); + + @Test + public void shardAddAndRemoveTest() { + String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN); + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("entity", vf.createURI("urn:entity")); + bs.addBinding("location", vf.createLiteral("location_1")); + VisibilityBindingSet vBs = new VisibilityBindingSet(bs); + VariableOrder varOrder = new VariableOrder("entity","location"); + Bytes row = RowKeyUtil.makeRowKey(nodeId, varOrder, vBs); + Bytes shardedRow = BindingHashShardingFunction.addShard(nodeId, varOrder, vBs); + Bytes shardlessRow = BindingHashShardingFunction.removeHash(Bytes.of(SP_PREFIX), shardedRow); + Assert.assertEquals(row, shardlessRow); + } + + @Test + public void bindingSetRowTest() { + String nodeId = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN); + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("entity", vf.createURI("urn:entity")); + bs.addBinding("location", vf.createLiteral("location_1")); + VisibilityBindingSet vBs = new VisibilityBindingSet(bs); + VariableOrder varOrder = new VariableOrder("entity","location"); + Bytes row = RowKeyUtil.makeRowKey(nodeId, varOrder, vBs); + Bytes shardedRow = BindingHashShardingFunction.addShard(nodeId, varOrder, vBs); + BindingSetRow expected = BindingSetRow.make(row); + BindingSetRow actual = BindingSetRow.makeFromShardedRow(Bytes.of(SP_PREFIX), shardedRow); + Assert.assertEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java new file mode 100644 index 0000000..ac41dc8 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtilsTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import java.util.Arrays; + +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.api.resolver.triple.TripleRowResolverException; +import org.junit.Assert; +import org.junit.Test; + +public class TriplePrefixUtilsTest { + + @Test + public void testAddRemovePrefix() throws TripleRowResolverException { + byte[] expected = Bytes.of("triple").toArray(); + Bytes fluoBytes = TriplePrefixUtils.addTriplePrefixAndConvertToBytes(expected); + byte[] returned = TriplePrefixUtils.removeTriplePrefixAndConvertToByteArray(fluoBytes); + Assert.assertEquals(true, Arrays.equals(expected, returned)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml index c038dbe..e594474 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml @@ -1,14 +1,21 @@ -<?xml version="1.0" encoding="utf-8"?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - you under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java index 66aa04b..7e882e9 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java @@ -41,7 +41,7 @@ import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation; @@ -54,13 +54,16 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction; import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; -import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.algebra.evaluation.QueryBindingSet; import com.google.common.base.Optional; @@ -70,6 +73,7 @@ public class BatchIT extends RyaExportITBase { private static final Logger log = Logger.getLogger(BatchIT.class); private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + private static final ValueFactory vf = new ValueFactoryImpl(); @Test public void simpleScanDelete() throws Exception { @@ -89,16 +93,16 @@ public class BatchIT extends RyaExportITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()).getQueryId(); + String queryId = new CreateFluoPcj() + .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); List<String> prefixes = Arrays.asList("urn:subject_1", "urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1"); // Stream the data into Fluo. InsertTriples inserter = new InsertTriples(); - inserter.insert(fluoClient, statements1, Optional.<String> absent()); - inserter.insert(fluoClient, statements2, Optional.<String> absent()); + inserter.insert(fluoClient, statements1, Optional.absent()); + inserter.insert(fluoClient, statements2, Optional.absent()); // Verify the end results of the query match the expected results. getMiniFluo().waitForObservers(); @@ -129,22 +133,26 @@ public class BatchIT extends RyaExportITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()).getQueryId(); + String queryId = new CreateFluoPcj() + .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); String joinId = ids.get(2); String rightSp = ids.get(4); QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("subject", new URIImpl("urn:subject_1")); - bs.addBinding("object1", new URIImpl("urn:object_0")); + bs.addBinding("subject", vf.createURI("urn:subject_1")); + bs.addBinding("object1", vf.createURI("urn:object_0")); VisibilityBindingSet vBs = new VisibilityBindingSet(bs); - Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1")); + + //create sharded span for deletion + URI uri = vf.createURI("urn:subject_1"); + Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(rightSp, uri); + Span span = Span.prefix(prefixBytes); // Stream the data into Fluo. InsertTriples inserter = new InsertTriples(); - inserter.insert(fluoClient, statements1, Optional.<String> absent()); - inserter.insert(fluoClient, statements2, Optional.<String> absent()); + inserter.insert(fluoClient, statements1, Optional.absent()); + inserter.insert(fluoClient, statements2, Optional.absent()); getMiniFluo().waitForObservers(); verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 25, 5, 5)); @@ -175,21 +183,24 @@ public class BatchIT extends RyaExportITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()).getQueryId(); + String queryId = new CreateFluoPcj() + .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); String joinId = ids.get(2); String rightSp = ids.get(4); QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("subject", new URIImpl("urn:subject_1")); - bs.addBinding("object1", new URIImpl("urn:object_0")); + bs.addBinding("subject", vf.createURI("urn:subject_1")); + bs.addBinding("object1", vf.createURI("urn:object_0")); VisibilityBindingSet vBs = new VisibilityBindingSet(bs); - Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1")); + + URI uri = vf.createURI("urn:subject_1"); + Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(rightSp, uri); + Span span = Span.prefix(prefixBytes); // Stream the data into Fluo. InsertTriples inserter = new InsertTriples(); - inserter.insert(fluoClient, statements2, Optional.<String> absent()); + inserter.insert(fluoClient, statements2, Optional.absent()); getMiniFluo().waitForObservers(); verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0, 5)); @@ -214,7 +225,7 @@ public class BatchIT extends RyaExportITBase { RyaURI subj = new RyaURI("urn:subject_1"); RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); - + Set<RyaStatement> statements1 = getRyaStatements(statement1, 15); Set<RyaStatement> statements2 = getRyaStatements(statement2, 15); @@ -224,22 +235,21 @@ public class BatchIT extends RyaExportITBase { // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and // batch size of joins to 5. - String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()).getQueryId(); + String queryId = new CreateFluoPcj(5, 5) + .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); // Stream the data into Fluo. InsertTriples inserter = new InsertTriples(); - inserter.insert(fluoClient, statements1, Optional.<String> absent()); - inserter.insert(fluoClient, statements2, Optional.<String> absent()); + inserter.insert(fluoClient, statements1, Optional.absent()); + inserter.insert(fluoClient, statements2, Optional.absent()); getMiniFluo().waitForObservers(); verifyCounts(fluoClient, ids, Arrays.asList(225, 225, 225, 15, 15)); } } - - + @Test public void leftJoinBatchIntegrationTest() throws Exception { final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " @@ -249,10 +259,10 @@ public class BatchIT extends RyaExportITBase { RyaURI subj = new RyaURI("urn:subject_1"); RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); - + subj = new RyaURI("urn:subject_2"); RyaStatement statement3 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); - + Set<RyaStatement> statements1 = getRyaStatements(statement1, 10); Set<RyaStatement> statements2 = getRyaStatements(statement2, 10); Set<RyaStatement> statements3 = getRyaStatements(statement3, 10); @@ -263,37 +273,35 @@ public class BatchIT extends RyaExportITBase { // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and // batch size of joins to 5. - String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()).getQueryId(); + String queryId = new CreateFluoPcj(5, 5) + .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); // Stream the data into Fluo. InsertTriples inserter = new InsertTriples(); - inserter.insert(fluoClient, statements1, Optional.<String> absent()); - inserter.insert(fluoClient, statements2, Optional.<String> absent()); - inserter.insert(fluoClient, statements3, Optional.<String> absent()); + inserter.insert(fluoClient, statements1, Optional.absent()); + inserter.insert(fluoClient, statements2, Optional.absent()); + inserter.insert(fluoClient, statements3, Optional.absent()); getMiniFluo().waitForObservers(); verifyCounts(fluoClient, ids, Arrays.asList(110, 110, 110, 20, 10)); } } - - + @Test public void multiJoinBatchIntegrationTest() throws Exception { final String sparql = "SELECT ?subject1 ?subject2 ?object1 ?object2 WHERE { ?subject1 <urn:predicate_1> ?object1; " - + " <urn:predicate_2> ?object2 ." - + " ?subject2 <urn:predicate_3> ?object2 } "; + + " <urn:predicate_2> ?object2 ." + " ?subject2 <urn:predicate_3> ?object2 } "; try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { RyaURI subj1 = new RyaURI("urn:subject_1"); RyaStatement statement1 = new RyaStatement(subj1, new RyaURI("urn:predicate_1"), null); RyaStatement statement2 = new RyaStatement(subj1, new RyaURI("urn:predicate_2"), null); - + Set<RyaStatement> statements1 = getRyaStatements(statement1, 10); Set<RyaStatement> statements2 = getRyaStatements(statement2, 10); - + RyaURI subj2 = new RyaURI("urn:subject_2"); RyaStatement statement3 = new RyaStatement(subj2, new RyaURI("urn:predicate_3"), null); Set<RyaStatement> statements3 = getRyaStatements(statement3, 10); @@ -304,23 +312,22 @@ public class BatchIT extends RyaExportITBase { // Tell the Fluo app to maintain the PCJ and sets batch scan size for StatementPatterns to 5 and // batch size of joins to 5. - String queryId = new CreateFluoPcj(5, 5).withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), - getRyaInstanceName()).getQueryId(); + String queryId = new CreateFluoPcj(5, 5) + .withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()).getQueryId(); List<String> ids = getNodeIdStrings(fluoClient, queryId); // Stream the data into Fluo. InsertTriples inserter = new InsertTriples(); - inserter.insert(fluoClient, statements1, Optional.<String> absent()); - inserter.insert(fluoClient, statements2, Optional.<String> absent()); - inserter.insert(fluoClient, statements3, Optional.<String> absent()); + inserter.insert(fluoClient, statements1, Optional.absent()); + inserter.insert(fluoClient, statements2, Optional.absent()); + inserter.insert(fluoClient, statements3, Optional.absent()); getMiniFluo().waitForObservers(); verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 100, 100, 10, 10, 10)); } } - private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) { Set<RyaStatement> statements = new HashSet<>(); @@ -361,13 +368,19 @@ public class BatchIT extends RyaExportITBase { for (int i = 0; i < ids.size(); i++) { String id = ids.get(i); String bsPrefix = prefixes.get(i); + URI uri = vf.createURI(bsPrefix); + Bytes prefixBytes = BindingHashShardingFunction.getShardedScanPrefix(id, uri); NodeType type = NodeType.fromNodeId(id).get(); Column bsCol = type.getResultColumn(); - String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix; - Span span = Span.prefix(Bytes.of(row)); - BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span) - .build(); - BatchInformationDAO.addBatch(tx, id, batch); + SpanBatchDeleteInformation.Builder builder = SpanBatchDeleteInformation.builder().setBatchSize(batchSize) + .setColumn(bsCol); + if (type == NodeType.JOIN) { + builder.setSpan(Span.prefix(type.getNodeTypePrefix())); + builder.setNodeId(java.util.Optional.of(id)); + } else { + builder.setSpan(Span.prefix(prefixBytes)); + } + BatchInformationDAO.addBatch(tx, id, builder.build()); } tx.commit(); } @@ -383,14 +396,19 @@ public class BatchIT extends RyaExportITBase { private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) { try (Transaction tx = fluoClient.newTransaction()) { int count = 0; - RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build(); + Optional<NodeType> type = NodeType.fromNodeId(nodeId); + Bytes prefixBytes = Bytes.of(type.get().getNodeTypePrefix()); + RowScanner scanner = tx.scanner().over(Span.prefix(prefixBytes)).fetch(bsColumn).byRow().build(); Iterator<ColumnScanner> colScanners = scanner.iterator(); while (colScanners.hasNext()) { ColumnScanner colScanner = colScanners.next(); - Iterator<ColumnValue> vals = colScanner.iterator(); - while (vals.hasNext()) { - vals.next(); - count++; + BindingSetRow bsRow = BindingSetRow.makeFromShardedRow(prefixBytes, colScanner.getRow()); + if (bsRow.getNodeId().equals(nodeId)) { + Iterator<ColumnValue> vals = colScanner.iterator(); + while (vals.hasNext()) { + vals.next(); + count++; + } } } tx.commit(); @@ -405,7 +423,6 @@ public class BatchIT extends RyaExportITBase { int expected = expectedCounts.get(i); NodeType type = NodeType.fromNodeId(id).get(); int count = countResults(fluoClient, id, type.getResultColumn()); - log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected); switch (type) { case STATEMENT_PATTERN: assertEquals(expected, count); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java index 27b8222..ef5ab34 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java @@ -76,17 +76,18 @@ public class CreateDeleteIT extends RyaExportITBase { // Create the PCJ in Fluo and load the statements into Rya. final String pcjId = loadData(sparql, statements); - try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + try(FluoClient fluoClient = FluoFactory.newClient(getFluoConfiguration())) { // Ensure the data was loaded. final List<Bytes> rows = getFluoTableEntries(fluoClient); - assertEquals(18, rows.size()); + assertEquals(19, rows.size()); // Delete the PCJ from the Fluo application. new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId); + getMiniFluo().waitForObservers(); // Ensure all data related to the query has been removed. final List<Bytes> empty_rows = getFluoTableEntries(fluoClient); - assertEquals(0, empty_rows.size()); + assertEquals(1, empty_rows.size()); } } @@ -108,20 +109,21 @@ public class CreateDeleteIT extends RyaExportITBase { // Create the PCJ in Fluo and load the statements into Rya. final String pcjId = loadData(sparql, statements); - try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + try(FluoClient fluoClient = FluoFactory.newClient(getFluoConfiguration())) { // Ensure the data was loaded. final List<Bytes> rows = getFluoTableEntries(fluoClient); - assertEquals(10, rows.size()); + assertEquals(11, rows.size()); // Delete the PCJ from the Fluo application. new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId); + getMiniFluo().waitForObservers(); // Ensure all data related to the query has been removed. final List<Bytes> empty_rows = getFluoTableEntries(fluoClient); - assertEquals(0, empty_rows.size()); + assertEquals(1, empty_rows.size()); } } - + private String loadData(final String sparql, final Collection<Statement> statements) throws Exception { requireNonNull(sparql); @@ -133,14 +135,14 @@ public class CreateDeleteIT extends RyaExportITBase { final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet()); // Write the data to Rya. - final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); + final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection(); ryaConn.begin(); ryaConn.add(statements); ryaConn.commit(); ryaConn.close(); // Wait for the Fluo application to finish computing the end result. - super.getMiniFluo().waitForObservers(); + getMiniFluo().waitForObservers(); // The PCJ Id is the topic name the results will be written to. return pcjId; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java index e61104a..eb21b34 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java @@ -109,11 +109,11 @@ public class CreateDeletePeriodicPCJ extends KafkaExportITBase { vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4"))); - runTest(query, statements, 29); + runTest(query, statements, 30); } - + private void runTest(String query, Collection<Statement> statements, int expectedEntries) throws Exception { try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { @@ -134,10 +134,11 @@ public class CreateDeletePeriodicPCJ extends KafkaExportITBase { DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, storage); deletePeriodic.deletePeriodicQuery(FluoQueryUtils.convertFluoQueryIdToPcjId(id), notificationClient); + getMiniFluo().waitForObservers(); // Ensure all data related to the query has been removed. final List<Bytes> empty_rows = getFluoTableEntries(fluoClient); - assertEquals(0, empty_rows.size()); + assertEquals(1, empty_rows.size()); // Ensure that Periodic Service notified to add and delete PeriodicNotification Set<CommandNotification> notifications; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java deleted file mode 100644 index fabf512..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/FluoLatencyIT.java +++ /dev/null @@ -1,169 +0,0 @@ -package org.apache.rya.indexing.pcj.fluo.integration; - -import static java.util.Objects.requireNonNull; - -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.xml.datatype.DatatypeConfigurationException; -import javax.xml.datatype.DatatypeFactory; - -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.client.Transaction; -import org.apache.fluo.api.client.scanner.CellScanner; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.data.RowColumnValue; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.rya.api.client.CreatePCJ.ExportStrategy; -import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; -import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase; -import org.junit.BeforeClass; -import org.junit.Test; -import org.openrdf.model.Literal; -import org.openrdf.model.Statement; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.repository.sail.SailRepositoryConnection; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -public class FluoLatencyIT extends KafkaExportITBase { - private static ValueFactory vf; - private static DatatypeFactory dtf; - - @BeforeClass - public static void init() throws DatatypeConfigurationException { - vf = new ValueFactoryImpl(); - dtf = DatatypeFactory.newInstance(); - } - - @Test - public void resultsExported() throws Exception { - - final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type (count(?obs) as ?total) where { " - + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "} " + "group by ?type"; - -// final String sparql = "prefix time: <http://www.w3.org/2006/time#> " + "select ?type ?obs where { " -// + " ?obs <uri:hasTime> ?time. " + " ?obs <uri:hasObsType> ?type " + "}"; - - try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { - // Tell the Fluo app to maintain the PCJ. - String pcjId = FluoQueryUtils.createNewPcjId(); - FluoConfiguration conf = super.getFluoConfiguration(); - new CreateFluoPcj().createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluoClient); - SailRepositoryConnection conn = super.getRyaSailRepository().getConnection(); - - long start = System.currentTimeMillis(); - int numReturned = 0; - int numObs = 10; - int numTypes = 5; - int numExpected = 0; - int increment = numObs*numTypes; - while (System.currentTimeMillis() - start < 60000) { - List<Statement> statements = generate(10, 5, "car_", numExpected, ZonedDateTime.now()); - conn.add(statements); - numExpected += increment; - System.out.println("Num Accumulo Entries: " + getNumAccEntries(conf.getAccumuloTable()) + " Num Fluo Entries: " - + getNumFluoEntries(fluoClient)); - numReturned += readAllResults(pcjId).size(); - System.out - .println("Expected: " + numExpected + " NumReturned: " + numReturned + " Difference: " + (numExpected - numReturned)); -// FluoITHelper.printFluoTable(conf); - Thread.sleep(30000); - } - } - } - - /** - * Generates (numObservationsPerType x numTypes) statements of the form: - * - * <pre> - * urn:obs_n uri:hasTime zonedTime - * urn:obs_n uri:hasObsType typePrefix_m - * </pre> - * - * Where the n in urn:obs_n is the ith value in 0 to (numObservationsPerType x numTypes) with an offset specified by - * observationOffset, and where m in typePrefix_m is the jth value in 0 to numTypes. - * - * @param numObservationsPerType - The quantity of observations per type to generate. - * @param numTypes - The number of types to generate observations for. - * @param typePrefix - The prefix to be used for the type literal in the statement. - * @param observationOffset - The offset to be used for determining the value of n in the above statements. - * @param zonedTime - The time to be used for all observations generated. - * @return A new list of all generated Statements. - */ - public List<Statement> generate(final long numObservationsPerType, final int numTypes, final String typePrefix, - final long observationOffset, final ZonedDateTime zonedTime) { - final String time = zonedTime.format(DateTimeFormatter.ISO_INSTANT); - final Literal litTime = vf.createLiteral(dtf.newXMLGregorianCalendar(time)); - final List<Statement> statements = Lists.newArrayList(); - - for (long i = 0; i < numObservationsPerType; i++) { - for (int j = 0; j < numTypes; j++) { - final long observationId = observationOffset + i * numTypes + j; - // final String obsId = "urn:obs_" + Long.toHexString(observationId) + "_" + observationId; - // final String obsId = "urn:obs_" + observationId; - final String obsId = "urn:obs_" + String.format("%020d", observationId); - final String type = typePrefix + j; - // logger.info(obsId + " " + type + " " + litTime); - statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasTime"), litTime)); - statements.add(vf.createStatement(vf.createURI(obsId), vf.createURI("uri:hasObsType"), vf.createLiteral(type))); - } - } - - return statements; - } - - private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception { - requireNonNull(pcjId); - - // Read all of the results from the Kafka topic. - final Set<VisibilityBindingSet> results = new HashSet<>(); - - try (final KafkaConsumer<String, VisibilityBindingSet> consumer = makeConsumer(pcjId)) { - final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(5000); - final Iterator<ConsumerRecord<String, VisibilityBindingSet>> recordIterator = records.iterator(); - while (recordIterator.hasNext()) { - results.add(recordIterator.next().value()); - } - } - - return results; - } - - private int getNumAccEntries(String tableName) throws TableNotFoundException { - Scanner scanner = super.getAccumuloConnector().createScanner(tableName, new Authorizations()); - int count = 0; - for (Map.Entry<Key, Value> entry : scanner) { - count++; - } - return count; - } - - private int getNumFluoEntries(FluoClient client) { - Transaction tx = client.newTransaction(); - CellScanner scanner = tx.scanner().build(); - int count = 0; - for (RowColumnValue rcv : scanner) { - count++; - } - return count; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index beaef32..2b87a97 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -123,9 +123,8 @@ public class QueryIT extends RyaExportITBase { // and are skilled with computers. The resulting binding set includes everybody who // was involved in the recruitment process. final String sparql = "SELECT ?recruiter ?candidate ?leader " + "{ " + "?recruiter <http://recruiterFor> <http://GeekSquad>. " - + "?candidate <http://skilledWith> <http://Computers>. " + "?candidate <http://livesIn> \"USA\". " - + "?leader <http://leaderOf> <http://GeekSquad>. " + "?recruiter <http://talksTo> ?candidate. " - + "?candidate <http://talksTo> ?leader. " + "}"; + + "?recruiter <http://talksTo> ?candidate. " + "?candidate <http://skilledWith> <http://Computers>. " + "?candidate <http://livesIn> \"USA\". " + + "?candidate <http://talksTo> ?leader." + "?leader <http://leaderOf> <http://GeekSquad>. }"; // Create the Statements that will be loaded into Rya. final ValueFactory vf = new ValueFactoryImpl(); @@ -426,10 +425,10 @@ public class QueryIT extends RyaExportITBase { runTest(sparql, statements, expectedResults, ExportStrategy.RYA); } - + @Test public void dateTimeWithin() throws Exception { - + final ValueFactory vf = new ValueFactoryImpl(); DatatypeFactory dtf = DatatypeFactory.newInstance(); FunctionRegistry.getInstance().add(new DateTimeWithinPeriod()); @@ -437,13 +436,13 @@ public class QueryIT extends RyaExportITBase { final String sparql = "PREFIX fn: <" + FN.NAMESPACE +">" + "SELECT ?event ?startTime ?endTime WHERE { ?event <uri:startTime> ?startTime; <uri:endTime> ?endTime. " + "FILTER(fn:dateTimeWithin(?startTime, ?endTime, 2,<" + OWLTime.HOURS_URI + "> ))}"; - + ZonedDateTime zTime = ZonedDateTime.now(); String time = zTime.format(DateTimeFormatter.ISO_INSTANT); ZonedDateTime zTime1 = zTime.minusHours(1); String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - + ZonedDateTime zTime2 = zTime.minusHours(2); String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); @@ -471,10 +470,10 @@ public class QueryIT extends RyaExportITBase { // Verify the end results of the query match the expected results. runTest(sparql, statements, expectedResults, ExportStrategy.RYA); } - + @Test public void dateTimeWithinNow() throws Exception { - + final ValueFactory vf = new ValueFactoryImpl(); DatatypeFactory dtf = DatatypeFactory.newInstance(); FunctionRegistry.getInstance().add(new DateTimeWithinPeriod()); @@ -482,13 +481,13 @@ public class QueryIT extends RyaExportITBase { final String sparql = "PREFIX fn: <" + FN.NAMESPACE +">" + "SELECT ?event ?startTime WHERE { ?event <uri:startTime> ?startTime. " + "FILTER(fn:dateTimeWithin(?startTime, NOW(), 15, <" + OWLTime.SECONDS_URI + "> ))}"; - + ZonedDateTime zTime = ZonedDateTime.now(); String time = zTime.format(DateTimeFormatter.ISO_INSTANT); ZonedDateTime zTime1 = zTime.minusSeconds(30); String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - + Literal lit = vf.createLiteral(dtf.newXMLGregorianCalendar(time)); Literal lit1 = vf.createLiteral(dtf.newXMLGregorianCalendar(time1)); @@ -511,7 +510,7 @@ public class QueryIT extends RyaExportITBase { } - + @Test public void periodicQueryTestWithoutAggregation() throws Exception { String query = "prefix function: <http://org.apache.rya/function#> " // n @@ -800,8 +799,8 @@ public class QueryIT extends RyaExportITBase { // Verify the end results of the query match the expected results. runTest(query, statements, expectedResults, ExportStrategy.PERIODIC); } - - + + @Test public void nestedPeriodicQueryTestWithAggregationAndGroupBy() throws Exception { String query = "prefix function: <http://org.apache.rya/function#> " // n @@ -879,7 +878,7 @@ public class QueryIT extends RyaExportITBase { // Verify the end results of the query match the expected results. runTest(query, statements, expectedResults, ExportStrategy.PERIODIC); } - + @Test public void nestedJoinPeriodicQueryWithAggregationAndGroupBy() throws Exception { String query = "prefix function: <http://org.apache.rya/function#> " // n @@ -1006,6 +1005,7 @@ public class QueryIT extends RyaExportITBase { // Ensure the result of the query matches the expected result. assertEquals(expectedResults, results); } + break; case PERIODIC: PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName()); @@ -1014,7 +1014,7 @@ public class QueryIT extends RyaExportITBase { new CreateFluoPcj().createPcj(periodicId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluo); } addStatementsAndWait(statements); - + final Set<BindingSet> results = Sets.newHashSet(); try (CloseableIterator<BindingSet> resultIter = periodicStorage.listResults(periodicId, Optional.empty())) { while (resultIter.hasNext()) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java new file mode 100644 index 0000000..91c9977 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StatementPatternIdCacheIT.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.integration; + +import static org.junit.Assert.assertEquals; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCache; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; +import org.junit.Test; + +import com.google.common.collect.Sets; + +public class StatementPatternIdCacheIT extends RyaExportITBase { + + /** + * Ensure streamed matches are included in the result. + */ + @Test + public void statementPatternIdCacheTest() throws Exception { + // A query that finds people who talk to Eve and work at Chipotle. + final String sparql1 = + "SELECT ?x WHERE { " + + "?x <urn:pred1> <urn:obj1>. " + + "?x <urn:pred2> <urn:obj2>." + + "}"; + + final String sparql2 = + "SELECT ?x WHERE { " + + "?x <urn:pred3> <urn:obj3>. " + + "?x <urn:pred4> <urn:obj4>." + + "}"; + + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + + String pcjId = FluoQueryUtils.createNewPcjId(); + // Tell the Fluo app to maintain the PCJ. + FluoQuery query1 = new CreateFluoPcj().createPcj(pcjId, sparql1, new HashSet<>(), fluoClient); + Set<String> spIds1 = new HashSet<>(); + for(StatementPatternMetadata metadata: query1.getStatementPatternMetadata()) { + spIds1.add(metadata.getNodeId()); + } + + StatementPatternIdCache cache = new StatementPatternIdCache(); + + assertEquals(spIds1, cache.getStatementPatternIds(fluoClient.newTransaction())); + + FluoQuery query2 = new CreateFluoPcj().createPcj(pcjId, sparql2, new HashSet<>(), fluoClient); + Set<String> spIds2 = new HashSet<>(); + for(StatementPatternMetadata metadata: query2.getStatementPatternMetadata()) { + spIds2.add(metadata.getNodeId()); + } + + assertEquals(Sets.union(spIds1, spIds2), cache.getStatementPatternIds(fluoClient.newTransaction())); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties new file mode 100644 index 0000000..a5086ee --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Valid levels: +# TRACE, DEBUG, INFO, WARN, ERROR and FATAL +log4j.rootLogger=INFO, CONSOLE + +# Set independent logging levels +log4j.logger.org.apache.zookeeper=WARN +log4j.logger.kafka=WARN +log4j.logger.org.apache.kafka=WARN +#log4j.logger.org.apache.rya.indexing.pcj.fluo=DEBUG + +# LOGFILE is set to be a File appender using a PatternLayout. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +#log4j.appender.CONSOLE.Threshold=DEBUG + +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n + +#log4j.appender.default.file=org.apache.log4j.FileAppender +#log4j.appender.default.file.file=/home/cloudera/Desktop/log.out +#log4j.appender.default.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.default.file.layout.ConversionPattern=%d [%t] %-5p %c - %m%n + +#log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout +#log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java index 32ee962..48334d0 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java @@ -48,41 +48,42 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.minicluster.MiniAccumuloCluster; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.rya.accumulo.MiniAccumuloClusterInstance; -import org.apache.rya.accumulo.MiniAccumuloSingleton; -import org.apache.rya.accumulo.RyaTestInstanceRule; -import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; -import org.apache.rya.api.client.accumulo.AccumuloInstall; -import org.apache.zookeeper.ClientCnxn; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.openrdf.repository.RepositoryConnection; -import org.openrdf.repository.RepositoryException; -import org.openrdf.sail.Sail; -import org.openrdf.sail.SailException; - import org.apache.fluo.api.client.FluoAdmin; import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.api.mini.MiniFluo; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.accumulo.MiniAccumuloClusterInstance; +import org.apache.rya.accumulo.MiniAccumuloSingleton; +import org.apache.rya.accumulo.RyaTestInstanceRule; import org.apache.rya.api.client.Install; import org.apache.rya.api.client.Install.DuplicateInstanceNameException; import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloInstall; import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; +import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier; import org.apache.rya.rdftriplestore.RyaSailRepository; import org.apache.rya.rdftriplestore.inference.InferenceEngineException; import org.apache.rya.sail.config.RyaSailFactory; +import org.apache.zookeeper.ClientCnxn; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; /** * Integration tests that ensure the Fluo application processes PCJs results @@ -156,6 +157,7 @@ public abstract class FluoITBase { } catch (final Exception e) { log.error("Could not shut down the Rya Connection.", e); } + } if (ryaRepo != null) { @@ -187,6 +189,9 @@ public abstract class FluoITBase { log.error("Could not shut down the Mini Fluo.", e); } } + + StatementPatternIdCacheSupplier.clear(); + MetadataCacheSupplier.clear(); } protected void preFluoInitHook() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java index 7b16dcf..ca17b2a 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java @@ -53,6 +53,7 @@ import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterParameters; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaSubGraphExporterParameters; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer; @@ -64,11 +65,14 @@ import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; +import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternIdCacheSupplier; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.rdftriplestore.RyaSailRepository; import org.apache.rya.sail.config.RyaSailFactory; import org.junit.After; import org.junit.Before; +import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.repository.sail.SailRepositoryConnection; import org.openrdf.sail.Sail; @@ -115,6 +119,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { // Setup the observers that will be used by the Fluo PCJ Application. final List<ObserverSpecification> observers = new ArrayList<>(); observers.add(new ObserverSpecification(TripleObserver.class.getName())); + observers.add(new ObserverSpecification(BatchObserver.class.getName())); observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); observers.add(new ObserverSpecification(JoinObserver.class.getName())); observers.add(new ObserverSpecification(FilterObserver.class.getName())); @@ -185,6 +190,12 @@ public class KafkaExportITBase extends AccumuloExportITBase { } } + @After + public void clearCaches() { + StatementPatternIdCacheSupplier.clear(); + MetadataCacheSupplier.clear(); + } + private void installRyaInstance() throws Exception { final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); final String instanceName = cluster.getInstanceName(); @@ -261,7 +272,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { * If this test fails then its a testing environment issue, not with Rya. * Source: https://github.com/asmaier/mini-kafka */ -// @Test + @Test public void embeddedKafkaTest() throws Exception { // create topic final String topic = "testTopic"; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/62de7c5d/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java index 970cfd8..f540a2e 100644 --- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java @@ -132,6 +132,8 @@ public class GeoFunctionsIT extends RyaExportITBase { "PREFIX uom: <http://www.opengis.net/def/uom/OGC/1.0/> " + "SELECT ?cityA ?cityB " + "WHERE { " + + "?cityA <urn:containedIn> ?continent. " + + "?cityB <urn:containedIn> ?continent. " + "?cityA geo:asWKT ?coord1 . " + "?cityB geo:asWKT ?coord2 . " + // from brussels 173km to amsterdam @@ -147,9 +149,13 @@ public class GeoFunctionsIT extends RyaExportITBase { vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar2"), asWKT, vf.createLiteral("Point(-17.45 14.69)", wktTypeUri)), vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#canberra"), asWKT, vf.createLiteral("Point(149.12 -35.31)", wktTypeUri)), vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), asWKT, vf.createLiteral("Point(4.35 50.85)", wktTypeUri)), - vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri))); + vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)), + vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")), + vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar"), vf.createURI("urn:containedIn"), vf.createLiteral("Africa")), + vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#dakar2"), vf.createURI("urn:containedIn"), vf.createLiteral("Africa")), + vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe"))); - // The expected results of the SPARQL query once the PCJ has been computed. + // The expected results of the SPARQL query once the PCJ has been computed.l final Set<BindingSet> expectedResults = new HashSet<>(); MapBindingSet bs = new MapBindingSet(); @@ -234,6 +240,8 @@ public class GeoFunctionsIT extends RyaExportITBase { "PREFIX geof: <http://www.opengis.net/def/function/geosparql/> " + "PREFIX uom: <http://www.opengis.net/def/uom/OGC/1.0/> " + "SELECT ?cityA ?cityB { " + + "?cityA <urn:containedIn> ?continent. " + + "?cityB <urn:containedIn> ?continent. " + "?cityA geo:asWKT ?coord1 . " + "?cityB geo:asWKT ?coord2 . " + " FILTER ( geof:sfIntersects(?coord1, ?coord2) ) " + @@ -248,6 +256,8 @@ public class GeoFunctionsIT extends RyaExportITBase { vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#canberra"), asWKT, vf.createLiteral("Point(149.12 -35.31)", wktTypeUri)), vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#brussels"), asWKT, vf.createLiteral("Point(4.35 50.85)", wktTypeUri)), vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)), + vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")), + vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam2"), vf.createURI("urn:containedIn"), vf.createLiteral("Europe")), vf.createStatement(vf.createURI("tag:rya.apache.org,2017:ex#amsterdam2"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri))); // The expected results of the SPARQL query once the PCJ has been computed.
