http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java index 894ed84..c59cb87 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RangeBindingSetEntries.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.query; /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.openrdf.query.BindingSet;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java index 8d90b0c..b4333bd 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementBindingSetKeyValueIterator.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.query; /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import info.aduna.iteration.CloseableIteration; import java.util.Collection; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java index a4d0a40..f4c3081 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/RyaStatementKeyValueIterator.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.query; /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import info.aduna.iteration.CloseableIteration; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java index 95d76b9..d2dcef9 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/query/ScannerBaseCloseableIterable.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.query; /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import com.google.common.base.Preconditions; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.data.Key; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java index a2381b2..97d2f54 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/utils/TimeRangeFilter.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.utils; /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.Filter; @@ -83,4 +84,4 @@ public class TimeRangeFilter extends Filter { Long.parseLong(options.get(TIME_RANGE_PROP)); return true; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java index d1ddbaa..b7c9079 100644 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRdfConfigurationTest.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo; /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import org.apache.accumulo.core.security.Authorizations; import org.junit.Test; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java index 9295dd9..ab4528b 100644 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloRyaDAOTest.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo; /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import info.aduna.iteration.CloseableIteration; import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; import mvm.rya.api.RdfCloudTripleStoreUtils; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/DefineTripleQueryRangeFactoryTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/DefineTripleQueryRangeFactoryTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/DefineTripleQueryRangeFactoryTest.java index 168e85c..7c3331d 100644 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/DefineTripleQueryRangeFactoryTest.java +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/DefineTripleQueryRangeFactoryTest.java @@ -1,24 +1,24 @@ -//package mvm.rya.accumulo; - /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + +//package mvm.rya.accumulo; + // //import junit.framework.TestCase; //import mvm.rya.accumulo.AccumuloRdfConfiguration; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java index 07e7287..bda73e2 100644 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountToolTest.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.mr.eval; /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import mvm.rya.accumulo.AccumuloRdfConfiguration; import mvm.rya.accumulo.AccumuloRyaDAO; import mvm.rya.api.RdfCloudTripleStoreConstants; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java index 0d24b62..02b8357 100644 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputToolTest.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.mr.fileinput; /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import java.util.Iterator; import java.util.Map; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java index 2a09669..5ac2d74 100644 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.mr.upgrade; /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import junit.framework.TestCase; import mvm.rya.accumulo.AccumuloRdfConfiguration; import mvm.rya.accumulo.AccumuloRyaDAO; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java index 027bd7e..b138292 100644 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.mr.upgrade; /* - * #%L - * mvm.rya.accumulo.rya - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import mvm.rya.api.resolver.impl.*; import org.junit.Test; @@ -115,4 +116,4 @@ public class UpgradeObjectSerializationTest { assertEquals("c024000000000000", upgrade); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/pom.xml ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/pom.xml b/dao/cloudbase.rya/pom.xml deleted file mode 100644 index e08e111..0000000 --- a/dao/cloudbase.rya/pom.xml +++ /dev/null @@ -1,103 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>mvm.rya</groupId> - <artifactId>rya.dao</artifactId> - <version>3.2.10-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>cloudbase.rya</artifactId> - <name>${project.groupId}.${project.artifactId}</name> - <dependencies> - <dependency> - <groupId>mvm.rya</groupId> - <artifactId>rya.api</artifactId> - </dependency> - <dependency> - <groupId>mvm.rya</groupId> - <artifactId>cloudbase.utils</artifactId> - </dependency> - <dependency> - <groupId>mvm.rya</groupId> - <artifactId>cloudbase.iterators</artifactId> - </dependency> - - <dependency> - <groupId>mvm.rya</groupId> - <artifactId>rya.indexing</artifactId> - <version>${project.version}</version> - </dependency> - - <!-- Cloudbase deps --> - <dependency> - <groupId>cloudbase</groupId> - <artifactId>cloudbase-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <exclusions> - <!-- the log4j that comes with zookeeper 3.3.5 has some bad dependencies --> - <exclusion> - <groupId>javax.jms</groupId> - <artifactId>jms</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.texeltek</groupId> - <artifactId>accumulo-cloudbase-shim</artifactId> - <optional>true</optional> - </dependency> - <dependency> - <groupId>mvm.rya</groupId> - <artifactId>cloudbase.iterators</artifactId> - <optional>true</optional> - </dependency> - - - <!-- Sesame runtime --> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-rio-ntriples</artifactId> - <version>${openrdf.sesame.version}</version> - </dependency> - - </dependencies> - - <profiles> - <profile> - <id>mr</id> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> - </transformers> - </configuration> - </execution> - </executions> - - </plugin> - </plugins> - </build> - </profile> - </profiles> - -</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java deleted file mode 100644 index 7980d85..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java +++ /dev/null @@ -1,59 +0,0 @@ -package mvm.rya.cloudbase; - -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.NoSuchElementException; - -/** - * The intention of this iterator is the wrap the iterator that is returned by a - * BatchScan in cloudbase in order to serve as a workaround for - * ACCUMULO-226 (https://issues.apache.org/jira/browse/ACCUMULO-226). The bug - * involves subsequent calls to hasNext() on batch scan results after false has been - * returned will return true - * <p/> - * A patch has been submitted and accepted in Accumulo but this wrapper can be used - * for previous versions of Cloudbase/Accumulo that do not yet have the patch. - */ -public class BatchScannerIterator implements Iterator<Entry<Key, Value>> { - - private Iterator<Entry<Key, Value>> cloudbaseScanner = null; - - private Entry<Key, Value> nextKeyValue = null; - - public BatchScannerIterator(Iterator<Entry<Key, Value>> cloudbaseScanner) { - this.cloudbaseScanner = cloudbaseScanner; - } - - public boolean hasNext() { - if (nextKeyValue == null) { - if (cloudbaseScanner.hasNext()) { - nextKeyValue = cloudbaseScanner.next(); - } - } - return !isTerminatingKeyValue(nextKeyValue); - } - - private boolean isTerminatingKeyValue(Entry<Key, Value> nextEntry) { - if (nextEntry == null) { - return true; - } - return !(nextEntry.getKey() != null && nextEntry.getValue() != null); //Condition taken from cloudbase's TabletServerBatchReaderIterator - } - - public Entry<Key, Value> next() { - if (hasNext()) { - Entry<Key, Value> entry = nextKeyValue; - nextKeyValue = null; - return entry; - } else { - throw new NoSuchElementException(); - } - } - - public void remove() { - cloudbaseScanner.remove(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java deleted file mode 100644 index b20d79c..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java +++ /dev/null @@ -1,78 +0,0 @@ -package mvm.rya.cloudbase; - -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; -import com.google.common.base.Preconditions; -import info.aduna.iteration.CloseableIteration; -import mvm.rya.api.persist.RdfDAOException; -import org.openrdf.model.Namespace; -import org.openrdf.model.impl.NamespaceImpl; - -import java.io.IOError; -import java.util.Iterator; -import java.util.Map.Entry; - -public class CloudbaseNamespaceTableIterator<T extends Namespace> implements - CloseableIteration<Namespace, RdfDAOException> { - - private boolean open = false; - private Iterator<Entry<Key, Value>> result; - - public CloudbaseNamespaceTableIterator(Iterator<Entry<Key, Value>> result) throws RdfDAOException { - Preconditions.checkNotNull(result); - open = true; - this.result = result; - } - - @Override - public void close() throws RdfDAOException { - try { - verifyIsOpen(); - open = false; - } catch (IOError e) { - throw new RdfDAOException(e); - } - } - - public void verifyIsOpen() throws RdfDAOException { - if (!open) { - throw new RdfDAOException("Iterator not open"); - } - } - - @Override - public boolean hasNext() throws RdfDAOException { - verifyIsOpen(); - return result != null && result.hasNext(); - } - - @Override - public Namespace next() throws RdfDAOException { - if (hasNext()) { - return getNamespace(result); - } - return null; - } - - public static Namespace getNamespace(Iterator<Entry<Key, Value>> rowResults) { - for (; rowResults.hasNext(); ) { - Entry<Key, Value> next = rowResults.next(); - Key key = next.getKey(); - Value val = next.getValue(); - String cf = key.getColumnFamily().toString(); - String cq = key.getColumnQualifier().toString(); - return new NamespaceImpl(key.getRow().toString(), new String( - val.get())); - } - return null; - } - - @Override - public void remove() throws RdfDAOException { - next(); - } - - public boolean isOpen() { - return open; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java deleted file mode 100644 index e25c910..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java +++ /dev/null @@ -1,44 +0,0 @@ -package mvm.rya.cloudbase; - -import cloudbase.core.security.Authorizations; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.hadoop.conf.Configuration; - -/** - * Created by IntelliJ IDEA. - * Date: 4/25/12 - * Time: 3:24 PM - * To change this template use File | Settings | File Templates. - */ -public class CloudbaseRdfConfiguration extends RdfCloudTripleStoreConfiguration { - - public static final String MAXRANGES_SCANNER = "cb.query.maxranges"; - - public CloudbaseRdfConfiguration() { - super(); - } - - public CloudbaseRdfConfiguration(Configuration other) { - super(other); - } - - @Override - public CloudbaseRdfConfiguration clone() { - return new CloudbaseRdfConfiguration(this); - } - - public Authorizations getAuthorizations() { - String[] auths = getAuths(); - if (auths == null || auths.length == 0) - return CloudbaseRdfConstants.ALL_AUTHORIZATIONS; - return new Authorizations(auths); - } - - public void setMaxRangesForScanner(Integer max) { - setInt(MAXRANGES_SCANNER, max); - } - - public Integer getMaxRangesForScanner() { - return getInt(MAXRANGES_SCANNER, 2); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java deleted file mode 100644 index 690a050..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java +++ /dev/null @@ -1,20 +0,0 @@ -package mvm.rya.cloudbase; - -import cloudbase.core.CBConstants; -import cloudbase.core.data.Value; -import cloudbase.core.security.Authorizations; -import cloudbase.core.security.ColumnVisibility; - -/** - * Interface CloudbaseRdfConstants - * Date: Mar 1, 2012 - * Time: 7:24:52 PM - */ -public interface CloudbaseRdfConstants { - public static final Authorizations ALL_AUTHORIZATIONS = CBConstants.NO_AUTHS; - - public static final Value EMPTY_VALUE = new Value(new byte[0]); - - public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(new byte[0]); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java deleted file mode 100644 index 075d1fe..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java +++ /dev/null @@ -1,138 +0,0 @@ -package mvm.rya.cloudbase; - -import cloudbase.core.client.Connector; -import cloudbase.core.client.Scanner; -import cloudbase.core.client.admin.TableOperations; -import cloudbase.core.data.Key; -import cloudbase.core.data.Range; -import cloudbase.core.security.Authorizations; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreStatement; -import mvm.rya.api.layout.TableLayoutStrategy; -import mvm.rya.api.persist.RdfDAOException; -import mvm.rya.api.persist.RdfEvalStatsDAO; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.openrdf.model.Resource; -import org.openrdf.model.Value; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; - -import static com.google.common.base.Preconditions.checkNotNull; -import static mvm.rya.api.RdfCloudTripleStoreConstants.*; - -/** - * Class CloudbaseRdfEvalStatsDAO - * Date: Feb 28, 2012 - * Time: 5:03:16 PM - */ -public class CloudbaseRdfEvalStatsDAO implements RdfEvalStatsDAO<CloudbaseRdfConfiguration> { - - private boolean initialized = false; - private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration(); - - private Collection<RdfCloudTripleStoreStatement> statements = new ArrayList<RdfCloudTripleStoreStatement>(); - private Connector connector; - - // private String evalTable = TBL_EVAL; - private TableLayoutStrategy tableLayoutStrategy; - - @Override - public void init() throws RdfDAOException { - try { - if (isInitialized()) { - throw new IllegalStateException("Already initialized"); - } - checkNotNull(connector); - tableLayoutStrategy = conf.getTableLayoutStrategy(); -// evalTable = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); -// conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); - - TableOperations tos = connector.tableOperations(); - CloudbaseRdfUtils.createTableIfNotExist(tos, tableLayoutStrategy.getEval()); -// boolean tableExists = tos.exists(evalTable); -// if (!tableExists) -// tos.create(evalTable); - initialized = true; - } catch (Exception e) { - throw new RdfDAOException(e); - } - } - - @Override - public double getCardinality(CloudbaseRdfConfiguration conf, CARDINALITY_OF card, Value val) throws RdfDAOException { - return this.getCardinality(conf, card, val, null); - } - - @Override - public double getCardinality(CloudbaseRdfConfiguration conf, CARDINALITY_OF card, Value val, Resource context) throws RdfDAOException { - try { - Authorizations authorizations = conf.getAuthorizations(); - Scanner scanner = connector.createScanner(tableLayoutStrategy.getEval(), authorizations); - Text cfTxt = null; - if (CARDINALITY_OF.SUBJECT.equals(card)) { - cfTxt = SUBJECT_CF_TXT; - } else if (CARDINALITY_OF.PREDICATE.equals(card)) { - cfTxt = PRED_CF_TXT; - } else if (CARDINALITY_OF.OBJECT.equals(card)) { -// cfTxt = OBJ_CF_TXT; //TODO: How do we do object cardinality - return Double.MAX_VALUE; - } else throw new IllegalArgumentException("Not right Cardinality[" + card + "]"); - Text cq = EMPTY_TEXT; - if (context != null) { - cq = new Text(context.stringValue().getBytes()); - } - scanner.fetchColumn(cfTxt, cq); - scanner.setRange(new Range(new Text(val.stringValue().getBytes()))); - Iterator<Map.Entry<Key, cloudbase.core.data.Value>> iter = scanner.iterator(); - if (iter.hasNext()) { - return Double.parseDouble(new String(iter.next().getValue().get())); - } - } catch (Exception e) { - throw new RdfDAOException(e); - } - - //default - return -1; - } - - @Override - public void destroy() throws RdfDAOException { - if (!isInitialized()) { - throw new IllegalStateException("Not initialized"); - } - initialized = false; - } - - @Override - public boolean isInitialized() throws RdfDAOException { - return initialized; - } - - public Connector getConnector() { - return connector; - } - - public void setConnector(Connector connector) { - this.connector = connector; - } - -// public String getEvalTable() { -// return evalTable; -// } -// -// public void setEvalTable(String evalTable) { -// this.evalTable = evalTable; -// } - - public CloudbaseRdfConfiguration getConf() { - return conf; - } - - public void setConf(CloudbaseRdfConfiguration conf) { - this.conf = conf; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java deleted file mode 100644 index 9114ae8..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java +++ /dev/null @@ -1,50 +0,0 @@ -package mvm.rya.cloudbase; - -import cloudbase.core.client.CBException; -import cloudbase.core.client.CBSecurityException; -import cloudbase.core.client.TableExistsException; -import cloudbase.core.client.admin.TableOperations; -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; -import mvm.rya.api.resolver.triple.TripleRow; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; - -/** - * Class CloudbaseRdfUtils - * Date: Mar 1, 2012 - * Time: 7:15:54 PM - */ -public class CloudbaseRdfUtils { - private static final Log logger = LogFactory.getLog(CloudbaseRyaDAO.class); - - public static void createTableIfNotExist(TableOperations tableOperations, String tableName) throws TableExistsException, CBSecurityException, CBException { - boolean tableExists = tableOperations.exists(tableName); - if (!tableExists) { - logger.info("Creating cloudbase table: " + tableName); - tableOperations.create(tableName); - } - } - - public static Key from(TripleRow tripleRow) { - return new Key(defaultTo(tripleRow.getRow(), EMPTY_BYTES), - defaultTo(tripleRow.getColumnFamily(), EMPTY_BYTES), - defaultTo(tripleRow.getColumnQualifier(), EMPTY_BYTES), - defaultTo(tripleRow.getColumnVisibility(), EMPTY_BYTES), - defaultTo(tripleRow.getTimestamp(), Long.MAX_VALUE)); - } - - public static Value extractValue(TripleRow tripleRow) { - return new Value(defaultTo(tripleRow.getValue(), EMPTY_BYTES)); - } - - private static byte[] defaultTo(byte[] bytes, byte[] def) { - return bytes != null ? bytes : def; - } - - private static Long defaultTo(Long l, Long def) { - return l != null ? l : def; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java deleted file mode 100644 index a3045e6..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java +++ /dev/null @@ -1,428 +0,0 @@ -package mvm.rya.cloudbase; - -import cloudbase.core.client.*; -import cloudbase.core.client.Scanner; -import cloudbase.core.client.admin.TableOperations; -import cloudbase.core.client.impl.TabletServerBatchDeleter; -import cloudbase.core.conf.Property; -import cloudbase.core.data.Key; -import cloudbase.core.data.Mutation; -import cloudbase.core.data.Range; -import cloudbase.core.security.Authorizations; -import cloudbase.core.security.ColumnVisibility; -import com.google.common.collect.Iterators; -import info.aduna.iteration.CloseableIteration; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.layout.TableLayoutStrategy; -import mvm.rya.api.persist.RyaDAO; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.persist.RyaNamespaceManager; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; -import mvm.rya.cloudbase.query.CloudbaseRyaQueryEngine; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; -import org.openrdf.model.Namespace; - -import java.text.SimpleDateFormat; -import java.util.*; - -import static com.google.common.base.Preconditions.checkNotNull; -import static mvm.rya.api.RdfCloudTripleStoreConstants.*; -import static mvm.rya.cloudbase.CloudbaseRdfConstants.ALL_AUTHORIZATIONS; -import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_CV; - -/** - * Class CloudbaseRyaDAO - * Date: Feb 29, 2012 - * Time: 12:37:22 PM - */ -public class CloudbaseRyaDAO implements RyaDAO<CloudbaseRdfConfiguration>, RyaNamespaceManager<CloudbaseRdfConfiguration> { - private static final Log logger = LogFactory.getLog(CloudbaseRyaDAO.class); - - private boolean initialized = false; - private Connector connector; - - private BatchWriter bw_spo; - private BatchWriter bw_po; - private BatchWriter bw_osp; - private BatchWriter bw_ns; - - private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration(); - private ColumnVisibility cv = EMPTY_CV; - private RyaTableMutationsFactory ryaTableMutationsFactory = new RyaTableMutationsFactory(); - private TableLayoutStrategy tableLayoutStrategy; - private CloudbaseRyaQueryEngine queryEngine; - private RyaContext ryaContext = RyaContext.getInstance(); - - @Override - public boolean isInitialized() throws RyaDAOException { - return initialized; - } - - @Override - public void init() throws RyaDAOException { - if (initialized) - return; - try { - checkNotNull(conf); - checkNotNull(connector); - - tableLayoutStrategy = conf.getTableLayoutStrategy(); - String cv_s = conf.getCv(); - if (cv_s != null) { - cv = new ColumnVisibility(cv_s); - } - - TableOperations tableOperations = connector.tableOperations(); - CloudbaseRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo()); - CloudbaseRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo()); - CloudbaseRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp()); - CloudbaseRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs()); - - //get the batch writers for tables - bw_spo = connector.createBatchWriter(tableLayoutStrategy.getSpo(), MAX_MEMORY, MAX_TIME, - NUM_THREADS); - bw_po = connector.createBatchWriter(tableLayoutStrategy.getPo(), MAX_MEMORY, MAX_TIME, - NUM_THREADS); - bw_osp = connector.createBatchWriter(tableLayoutStrategy.getOsp(), MAX_MEMORY, MAX_TIME, - NUM_THREADS); - - bw_ns = connector.createBatchWriter(tableLayoutStrategy.getNs(), MAX_MEMORY, - MAX_TIME, 1); - - queryEngine = new CloudbaseRyaQueryEngine(connector, getConf()); - - checkVersion(); - - initialized = true; - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - public String getVersion() throws RyaDAOException { - String version = null; - CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf); - if (versIter.hasNext()) { - version = versIter.next().getObject().getData(); - } - versIter.close(); - - return version; - } - - @Override - public void add(RyaStatement statement) throws RyaDAOException { - commit(Iterators.singletonIterator(statement)); - } - - @Override - public void add(Iterator<RyaStatement> iter) throws RyaDAOException { - commit(iter); - } - - @Override - public void delete(RyaStatement stmt, CloudbaseRdfConfiguration aconf) throws RyaDAOException { - this.delete(Iterators.singletonIterator(stmt), aconf); - } - - @Override - public void delete(Iterator<RyaStatement> statements, CloudbaseRdfConfiguration conf) throws RyaDAOException { - try { - while (statements.hasNext()) { - RyaStatement stmt = statements.next(); - //query first - CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf); - while (query.hasNext()) { - deleteSingleRyaStatement(query.next()); - } - } - bw_spo.flush(); - bw_po.flush(); - bw_osp.flush(); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - protected void deleteSingleRyaStatement(RyaStatement stmt) throws TripleRowResolverException, MutationsRejectedException { - Map<TABLE_LAYOUT, TripleRow> map = ryaContext.serializeTriple(stmt); - bw_spo.addMutation(deleteMutation(map.get(TABLE_LAYOUT.SPO))); - bw_po.addMutation(deleteMutation(map.get(TABLE_LAYOUT.PO))); - bw_osp.addMutation(deleteMutation(map.get(TABLE_LAYOUT.OSP))); - } - - protected Mutation deleteMutation(TripleRow tripleRow) { - Mutation m = new Mutation(new Text(tripleRow.getRow())); - - byte[] columnFamily = tripleRow.getColumnFamily(); - Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); - - byte[] columnQualifier = tripleRow.getColumnQualifier(); - Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - - m.putDelete(cfText, cqText, new ColumnVisibility(tripleRow.getColumnVisibility()), tripleRow.getTimestamp()); - return m; - } - - protected void commit(Iterator<RyaStatement> commitStatements) throws RyaDAOException { - try { - //TODO: Should have a lock here in case we are adding and committing at the same time - while (commitStatements.hasNext()) { - - Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(commitStatements.next()); - Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); - Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); - Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); - bw_spo.addMutations(spo); - bw_po.addMutations(po); - bw_osp.addMutations(osp); - } - - bw_spo.flush(); - bw_po.flush(); - bw_osp.flush(); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public void destroy() throws RyaDAOException { - if (!initialized) { - return; - } - //TODO: write lock - try { - initialized = false; - bw_osp.flush(); - bw_spo.flush(); - bw_po.flush(); - bw_ns.flush(); - - bw_osp.close(); - bw_spo.close(); - bw_po.close(); - bw_ns.close(); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public void addNamespace(String pfx, String namespace) throws RyaDAOException { - try { - Mutation m = new Mutation(new Text(pfx)); - m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new cloudbase.core.data.Value( - namespace.getBytes())); - bw_ns.addMutation(m); - bw_ns.flush(); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public String getNamespace(String pfx) throws RyaDAOException { - try { - Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), - ALL_AUTHORIZATIONS); - scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT); - scanner.setRange(new Range(new Text(pfx))); - Iterator<Map.Entry<Key, cloudbase.core.data.Value>> iterator = scanner - .iterator(); - - if (iterator.hasNext()) { - return new String(iterator.next().getValue().get()); - } - } catch (Exception e) { - throw new RyaDAOException(e); - } - return null; - } - - @Override - public void removeNamespace(String pfx) throws RyaDAOException { - try { - Mutation del = new Mutation(new Text(pfx)); - del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT); - bw_ns.addMutation(del); - bw_ns.flush(); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException { - try { - Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), - ALL_AUTHORIZATIONS); - scanner.fetchColumnFamily(INFO_NAMESPACE_TXT); - Iterator<Map.Entry<Key, cloudbase.core.data.Value>> result = scanner.iterator(); - return new CloudbaseNamespaceTableIterator(result); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public RyaNamespaceManager<CloudbaseRdfConfiguration> getNamespaceManager() { - return this; - } - - @Override - public void purge(RdfCloudTripleStoreConfiguration configuration) { - for (String tableName : getTables()) { - try { - purge(tableName, configuration.getAuths()); - compact(tableName); - } catch (TableNotFoundException e) { - logger.error(e.getMessage()); - } catch (MutationsRejectedException e) { - logger.error(e.getMessage()); - } - } - try { - if (isInitialized()) { - checkVersion(); - } - } catch (RyaDAOException e) { - logger.error("checkVersion() failed?", e); - } - } - - @Override - public void dropAndDestroy() throws RyaDAOException { - for (String tableName : getTables()) { - try { - drop(tableName); - } catch (CBSecurityException e) { - logger.error(e.getMessage()); - throw new RyaDAOException(e); - } catch (CBException e) { - logger.error(e.getMessage()); - throw new RyaDAOException(e); - } catch (TableNotFoundException e) { - logger.warn(e.getMessage()); - } - } - destroy(); - } - - public Connector getConnector() { - return connector; - } - - public void setConnector(Connector connector) { - this.connector = connector; - } - - public CloudbaseRdfConfiguration getConf() { - return conf; - } - - public void setConf(CloudbaseRdfConfiguration conf) { - this.conf = conf; - } - - public RyaTableMutationsFactory getRyaTableMutationsFactory() { - return ryaTableMutationsFactory; - } - - public void setRyaTableMutationsFactory(RyaTableMutationsFactory ryaTableMutationsFactory) { - this.ryaTableMutationsFactory = ryaTableMutationsFactory; - } - - public CloudbaseRyaQueryEngine getQueryEngine() { - return queryEngine; - } - - public void setQueryEngine(CloudbaseRyaQueryEngine queryEngine) { - this.queryEngine = queryEngine; - } - - protected String[] getTables() { - return new String[] { - tableLayoutStrategy.getSpo() - , tableLayoutStrategy.getPo() - , tableLayoutStrategy.getOsp() - , tableLayoutStrategy.getNs() - , tableLayoutStrategy.getEval() - }; - } - - private void purge(String tableName, String[] auths) throws TableNotFoundException, MutationsRejectedException { - if (tableExists(tableName)) { - logger.info("Purging cloudbase table: " + tableName); - BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths)); - try { - batchDeleter.setRanges(Collections.singleton(new Range())); - batchDeleter.delete(); - } finally { - ((TabletServerBatchDeleter)batchDeleter).close(); - } - } - } - - private void compact(String tableName) { - Date now = new Date(System.currentTimeMillis()); - SimpleDateFormat dateParser = new SimpleDateFormat("yyyyMMddHHmmssz", Locale.getDefault()); - String nowStr = dateParser.format(now); - try { - for (Map.Entry<String, String> prop : connector.tableOperations().getProperties(tableName)) { - if (prop.getKey().equals(Property.TABLE_MAJC_COMPACTALL_AT.getKey())) { - if (dateParser.parse(prop.getValue()).after(now)) { - return; - } else { - break; - } - } - } - - connector.tableOperations().flush(tableName); - logger.info("Requesting major compaction for table " + tableName); - connector.tableOperations().setProperty(tableName, Property.TABLE_MAJC_COMPACTALL_AT.getKey(), nowStr); - } catch (Exception e) { - logger.error(e.getMessage()); - } - } - - private Authorizations getAuthorizations(String auth) { - if (auth == null) { - return new Authorizations(); - } else { - String[] auths = auth.split(","); - return new Authorizations(auths); - } - } - - private boolean tableExists(String tableName) { - return getConnector().tableOperations().exists(tableName); - } - - private BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations) throws TableNotFoundException { - return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS); - } - - private void checkVersion() throws RyaDAOException { - String version = getVersion(); - if (version == null) { - this.add(getVersionRyaStatement()); - } - //TODO: Do a version check here - } - - protected RyaStatement getVersionRyaStatement() { - return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA); - } - - private void drop(String tableName) throws CBSecurityException, CBException, TableNotFoundException { - logger.info("Dropping cloudbase table: " + tableName); - connector.tableOperations().delete(tableName); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java deleted file mode 100644 index 8869759..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java +++ /dev/null @@ -1,93 +0,0 @@ -package mvm.rya.cloudbase; - -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; -import cloudbase.core.security.ColumnVisibility; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; -import org.apache.hadoop.io.Text; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; - -import static java.util.AbstractMap.SimpleEntry; -import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_CV; -import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_VALUE; - -public class RyaTableKeyValues { - public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(); - public static final Text EMPTY_CV_TEXT = new Text(EMPTY_CV.getExpression()); - - RyaContext instance = RyaContext.getInstance(); - - private RyaStatement stmt; - private Collection<Map.Entry<Key, Value>> spo = new ArrayList<Map.Entry<Key, Value>>(); - private Collection<Map.Entry<Key, Value>> po = new ArrayList<Map.Entry<Key, Value>>(); - private Collection<Map.Entry<Key, Value>> osp = new ArrayList<Map.Entry<Key, Value>>(); - - public RyaTableKeyValues(RyaStatement stmt) { - this.stmt = stmt; - } - - public Collection<Map.Entry<Key, Value>> getSpo() { - return spo; - } - - public Collection<Map.Entry<Key, Value>> getPo() { - return po; - } - - public Collection<Map.Entry<Key, Value>> getOsp() { - return osp; - } - - public RyaTableKeyValues invoke() throws IOException { - /** - * TODO: If there are contexts, do we still replicate the information into the default graph as well - * as the named graphs? - */try { - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, mvm.rya.api.resolver.triple.TripleRow> rowMap = instance.serializeTriple(stmt); - TripleRow tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); - byte[] columnVisibility = tripleRow.getColumnVisibility(); - Text cv = columnVisibility == null ? EMPTY_CV_TEXT : new Text(columnVisibility); - Long timestamp = tripleRow.getTimestamp(); - timestamp = timestamp == null ? 0l : timestamp; - byte[] value = tripleRow.getValue(); - Value v = value == null ? EMPTY_VALUE : new Value(value); - spo.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), - new Text(tripleRow.getColumnFamily()), - new Text(tripleRow.getColumnQualifier()), - cv, timestamp), v)); - tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); - po.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), - new Text(tripleRow.getColumnFamily()), - new Text(tripleRow.getColumnQualifier()), - cv, timestamp), v)); - tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); - osp.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), - new Text(tripleRow.getColumnFamily()), - new Text(tripleRow.getColumnQualifier()), - cv, timestamp), v)); - } catch (TripleRowResolverException e) { - throw new IOException(e); - } - return this; - } - - @Override - public String toString() { - return "RyaTableKeyValues{" + - "statement=" + stmt + - ", spo=" + spo + - ", po=" + po + - ", o=" + osp + - '}'; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java deleted file mode 100644 index ab9b37d..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java +++ /dev/null @@ -1,81 +0,0 @@ -package mvm.rya.cloudbase; - -import cloudbase.core.data.Mutation; -import cloudbase.core.data.Value; -import cloudbase.core.security.ColumnVisibility; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; -import org.apache.hadoop.io.Text; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_CV; -import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_VALUE; - -public class RyaTableMutationsFactory { - - RyaContext ryaContext = RyaContext.getInstance(); - - public RyaTableMutationsFactory() { - } - - //TODO: Does this still need to be collections - public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> serialize( - RyaStatement stmt) throws IOException { - - Collection<Mutation> spo_muts = new ArrayList<Mutation>(); - Collection<Mutation> po_muts = new ArrayList<Mutation>(); - Collection<Mutation> osp_muts = new ArrayList<Mutation>(); - /** - * TODO: If there are contexts, do we still replicate the information into the default graph as well - * as the named graphs? - */ - try { - Map<TABLE_LAYOUT, TripleRow> rowMap = ryaContext.serializeTriple(stmt); - TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO); - spo_muts.add(createMutation(tripleRow)); - tripleRow = rowMap.get(TABLE_LAYOUT.PO); - po_muts.add(createMutation(tripleRow)); - tripleRow = rowMap.get(TABLE_LAYOUT.OSP); - osp_muts.add(createMutation(tripleRow)); - } catch (TripleRowResolverException fe) { - throw new IOException(fe); - } - - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutations = - new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>>(); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts); - - return mutations; - } - - protected Mutation createMutation(TripleRow tripleRow) { - Mutation mutation = new Mutation(new Text(tripleRow.getRow())); - byte[] columnVisibility = tripleRow.getColumnVisibility(); - ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); - Long timestamp = tripleRow.getTimestamp(); - timestamp = timestamp == null ? 0l : timestamp; - byte[] value = tripleRow.getValue(); - Value v = value == null ? EMPTY_VALUE : new Value(value); - byte[] columnQualifier = tripleRow.getColumnQualifier(); - Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - byte[] columnFamily = tripleRow.getColumnFamily(); - Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); - - mutation.put(cfText,cqText, cv, timestamp, v); - return mutation; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java deleted file mode 100644 index 5c6e8cf..0000000 --- a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java +++ /dev/null @@ -1,350 +0,0 @@ -package mvm.rya.cloudbase.mr.eval; - -import cloudbase.core.CBConstants; -import cloudbase.core.client.mapreduce.CloudbaseInputFormat; -import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; -import cloudbase.core.data.Key; -import cloudbase.core.data.Mutation; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.FilteringIterator; -import cloudbase.core.iterators.filter.AgeOffFilter; -import cloudbase.core.security.Authorizations; -import cloudbase.core.security.ColumnVisibility; -import cloudbase.core.util.Pair; -import com.google.common.collect.Lists; -import com.google.common.io.ByteArrayDataInput; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.RdfCloudTripleStoreUtils; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; -import mvm.rya.cloudbase.CloudbaseRdfConstants; -import mvm.rya.cloudbase.mr.utils.MRUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; - -/** - * Count subject, predicate, object. Save in table - * Class RdfCloudTripleStoreCountTool - * Date: Apr 12, 2011 - * Time: 10:39:40 AM - */ -public class CloudbaseRdfCountTool implements Tool { - - public static final String TTL_PROP = "mvm.rya.cloudbase.sail.mr.eval.ttl"; - - private Configuration conf; - - public static void main(String[] args) { - try { - - ToolRunner.run(new Configuration(), new CloudbaseRdfCountTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * cloudbase props - */ - private RdfCloudTripleStoreConstants.TABLE_LAYOUT rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP; - private String userName = "root"; - private String pwd = "password"; - private String instance = "stratus"; - private String zk = "10.40.190.113:2181"; - private Authorizations authorizations = CBConstants.NO_AUTHS; - private String ttl = null; - - @Override - public int run(String[] strings) throws Exception { - conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics"); - - //conf - zk = conf.get(MRUtils.CB_ZK_PROP, zk); - ttl = conf.get(MRUtils.CB_TTL_PROP, ttl); - instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance); - userName = conf.get(MRUtils.CB_USERNAME_PROP, userName); - pwd = conf.get(MRUtils.CB_PWD_PROP, pwd); - boolean mock = conf.getBoolean(MRUtils.CB_MOCK_PROP, false); - String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); - if (tablePrefix != null) - RdfCloudTripleStoreConstants.prefixTables(tablePrefix); - rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf( - conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString())); - - String auth = conf.get(MRUtils.CB_AUTH_PROP); - if (auth != null) - authorizations = new Authorizations(auth.split(",")); - - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - conf.set("io.sort.mb", "256"); - Job job = new Job(conf); - job.setJarByClass(CloudbaseRdfCountTool.class); - - //set ttl - ttl = conf.get(TTL_PROP); - - // set up cloudbase input - job.setInputFormatClass(CloudbaseInputFormat.class); - CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(), - RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix), authorizations); - CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk); - Collection<Pair<Text, Text>> columns = new ArrayList<Pair<Text, Text>>(); - //TODO: What about named graphs/contexts here? -// final Pair pair = new Pair(RdfCloudTripleStoreConstants.INFO_TXT, RdfCloudTripleStoreConstants.INFO_TXT); -// columns.add(pair); -// CloudbaseInputFormat.fetchColumns(job, columns); - if (ttl != null) { - CloudbaseInputFormat.setIterator(job, 1, FilteringIterator.class.getName(), "filteringIterator"); - CloudbaseInputFormat.setIteratorOption(job, "filteringIterator", "0", AgeOffFilter.class.getName()); - CloudbaseInputFormat.setIteratorOption(job, "filteringIterator", "0.ttl", ttl); - } - - CloudbaseInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE})))); - - // set input output of the particular job - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(LongWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Mutation.class); - - // set mapper and reducer classes - job.setMapperClass(CountPiecesMapper.class); - job.setCombinerClass(CountPiecesCombiner.class); - job.setReducerClass(CountPiecesReducer.class); - - CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); - CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); - job.setOutputFormatClass(CloudbaseOutputFormat.class); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return 0; - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - @Override - public void setConf(Configuration configuration) { - this.conf = configuration; - } - - @Override - public Configuration getConf() { - return conf; - } - - public String getInstance() { - return instance; - } - - public void setInstance(String instance) { - this.instance = instance; - } - - public String getPwd() { - return pwd; - } - - public void setPwd(String pwd) { - this.pwd = pwd; - } - - public String getZk() { - return zk; - } - - public void setZk(String zk) { - this.zk = zk; - } - - public String getTtl() { - return ttl; - } - - public void setTtl(String ttl) { - this.ttl = ttl; - } - - public String getUserName() { - return userName; - } - - public void setUserName(String userName) { - this.userName = userName; - } - - public static class CountPiecesMapper extends Mapper<Key, Value, Text, LongWritable> { - - public static final byte[] EMPTY_BYTES = new byte[0]; - private RdfCloudTripleStoreConstants.TABLE_LAYOUT tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP; - - ValueFactoryImpl vf = new ValueFactoryImpl(); - - private Text keyOut = new Text(); - private LongWritable valOut = new LongWritable(1); - private RyaContext ryaContext = RyaContext.getInstance(); - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf( - conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString())); - } - - @Override - protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { - try { - RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes())); - //count each piece subject, pred, object - - String subj = statement.getSubject().getData(); - String pred = statement.getPredicate().getData(); -// byte[] objBytes = tripleFormat.getValueFormat().serialize(statement.getObject()); - RyaURI scontext = statement.getContext(); - boolean includesContext = scontext != null; - String scontext_str = (includesContext) ? scontext.getData() : null; - - ByteArrayDataOutput output = ByteStreams.newDataOutput(); - output.writeUTF(subj); - output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF); - output.writeBoolean(includesContext); - if (includesContext) - output.writeUTF(scontext_str); - keyOut.set(output.toByteArray()); - context.write(keyOut, valOut); - - output = ByteStreams.newDataOutput(); - output.writeUTF(pred); - output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF); - output.writeBoolean(includesContext); - if (includesContext) - output.writeUTF(scontext_str); - keyOut.set(output.toByteArray()); - context.write(keyOut, valOut); - - - //TODO: Obj in eval stats table? -// output = ByteStreams.newDataOutput(); -// output.write(objBytes); -// output.writeByte(RdfCloudTripleStoreConstants.DELIM_BYTE); -// output.writeUTF(RdfCloudTripleStoreConstants.OBJ_CF); -// output.writeBoolean(includesContext); -// if (includesContext) -// output.write(scontext_bytes); -// keyOut.set(output.toByteArray()); -// context.write(keyOut, valOut); - } catch (TripleRowResolverException e) { - throw new IOException(e); - } - } - } - - public static class CountPiecesCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { - - private LongWritable valOut = new LongWritable(); - - // TODO: can still add up to be larger I guess - // any count lower than this does not need to be saved - public static final int TOO_LOW = 2; - - @Override - protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { - long count = 0; - for (LongWritable lw : values) { - count += lw.get(); - } - - if (count <= TOO_LOW) - return; - - valOut.set(count); - context.write(key, valOut); - } - - } - - public static class CountPiecesReducer extends Reducer<Text, LongWritable, Text, Mutation> { - - Text row = new Text(); - Text cat_txt = new Text(); - Value v_out = new Value(); - ValueFactory vf = new ValueFactoryImpl(); - - // any count lower than this does not need to be saved - public static final int TOO_LOW = 10; - private String tablePrefix; - protected Text table; - private ColumnVisibility cv = CloudbaseRdfConstants.EMPTY_CV; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); - table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); - final String cv_s = context.getConfiguration().get(MRUtils.CB_CV_PROP); - if (cv_s != null) - cv = new ColumnVisibility(cv_s); - } - - @Override - protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { - long count = 0; - for (LongWritable lw : values) { - count += lw.get(); - } - - if (count <= TOO_LOW) - return; - - ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes()); - String v = badi.readUTF(); - cat_txt.set(badi.readUTF()); - - Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT; - boolean includesContext = badi.readBoolean(); - if (includesContext) { - columnQualifier = new Text(badi.readUTF()); - } - - row.set(v); - Mutation m = new Mutation(row); - v_out.set((count + "").getBytes()); - m.put(cat_txt, columnQualifier, cv, v_out); - context.write(table, m); - } - - } -} \ No newline at end of file
