http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.api/src/main/xsd/TimestampMergePolicyConfiguration.xsd ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.api/src/main/xsd/TimestampMergePolicyConfiguration.xsd b/extras/rya.export/export.api/src/main/xsd/TimestampMergePolicyConfiguration.xsd new file mode 100644 index 0000000..181691a --- /dev/null +++ b/extras/rya.export/export.api/src/main/xsd/TimestampMergePolicyConfiguration.xsd @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<schema xmlns="http://www.w3.org/2001/XMLSchema" + xmlns:xs="http://www.w3.org/2001/XMLSchema" + xmlns:mc="http://mergeconfig" + targetNamespace="http://mergeconfig" + elementFormDefault="qualified"> + + <xs:include schemaLocation="./MergeConfiguration.xsd"/> + + <xs:complexType name="TimestampMergePolicyConfiguration"> + <xs:complexContent> + <xs:extension base="mc:MergeToolConfiguration"> + <xs:sequence> + <xs:element name="toolStartTime" type="xs:string" minOccurs="0"/> + </xs:sequence> + </xs:extension> + </xs:complexContent> + </xs:complexType> +</schema> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.client/conf/config.xml ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/conf/config.xml b/extras/rya.export/export.client/conf/config.xml new file mode 100644 index 0000000..57787b1 --- /dev/null +++ b/extras/rya.export/export.client/conf/config.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. --> +<MergeToolConfiguration xmlns="http://mergeconfig"> + <parentHostname>10.63.8.102</parentHostname> + <parentUsername>SPEAR</parentUsername> + <parentPassword>spear</parentPassword> + <parentRyaInstanceName>spear_instance</parentRyaInstanceName> + <parentTablePrefix>asmith_demo_export_</parentTablePrefix> + <parentTomcatUrl>http://10.63.8.102:8080</parentTomcatUrl> + <parentDBType>accumulo</parentDBType> + <parentPort>1111</parentPort> + <childHostname>localhost</childHostname> + <childRyaInstanceName>rya_demo_child</childRyaInstanceName> + <childTablePrefix>asmith_demo_export_</childTablePrefix> + <childTomcatUrl>http://localhost:8080</childTomcatUrl> + <childDBType>mongo</childDBType> + <childPort>27017</childPort> + <mergePolicy>timestamp</mergePolicy> + <useNtpServer>false</useNtpServer> + <toolStartTime>Sep 026 2016 15:21:30</toolStartTime> +</MergeToolConfiguration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.client/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/pom.xml b/extras/rya.export/export.client/pom.xml new file mode 100644 index 0000000..2ea0a30 --- /dev/null +++ b/extras/rya.export/export.client/pom.xml @@ -0,0 +1,123 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.export.parent</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.export.client</artifactId> + + <name>Apache Rya Export Client</name> + <description>A command line tool that lets a user import and export rya states.</description> + + <dependencies> + <!-- Rya Runtime Dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.export.api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.export.accumulo</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.export.mongo</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- 3rd Party Runtime Dependencies. --> + <dependency> + <groupId>com.beust</groupId> + <artifactId>jcommander</artifactId> + </dependency> + + <dependency> + <groupId>com.toedter</groupId> + <artifactId>jcalendar</artifactId> + <version>1.1.4</version> + </dependency> + + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryrender</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-ntriples</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-trig</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-turtle</artifactId> + <version>${openrdf.sesame.version}</version> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Create an executable jar file for export. --> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <archive> + <manifest> + <addClasspath>true</addClasspath> + <classpathLayoutType>custom</classpathLayoutType> + <customClasspathLayout>WEB-INF/lib/$${artifact.groupIdPath}/$${artifact.artifactId}-$${artifact.version}$${dashClassifier?}.$${artifact.extension}</customClasspathLayout> + <mainClass>org.apache.rya.export.client.MergeDriverClient</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/MergeDriverClient.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/MergeDriverClient.java b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/MergeDriverClient.java new file mode 100644 index 0000000..992cc09 --- /dev/null +++ b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/MergeDriverClient.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.client; + +import static org.apache.rya.export.DBType.ACCUMULO; +import static org.apache.rya.export.MergePolicy.TIMESTAMP; + +import java.io.File; +import java.io.IOException; +import java.net.UnknownHostException; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.commons.cli.ParseException; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Logger; +import org.apache.log4j.xml.DOMConfigurator; +import org.apache.rya.export.accumulo.AccumuloRyaStatementStore; +import org.apache.rya.export.api.MergerException; +import org.apache.rya.export.api.conf.MergeConfiguration; +import org.apache.rya.export.api.conf.MergeConfigurationException; +import org.apache.rya.export.api.conf.policy.TimestampPolicyMergeConfiguration; +import org.apache.rya.export.api.store.RyaStatementStore; +import org.apache.rya.export.client.conf.MergeConfigurationCLI; +import org.apache.rya.export.client.conf.TimeUtils; +import org.apache.rya.export.client.merge.MemoryTimeMerger; +import org.apache.rya.export.client.merge.StatementStoreFactory; +import org.apache.rya.export.client.merge.VisibilityStatementMerger; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.UpdateExecutionException; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; + +/** + * Drives the MergeTool. + */ +public class MergeDriverClient { + private static final Logger LOG = Logger.getLogger(MergeDriverClient.class); + private static MergeConfiguration configuration; + + public static void main(final String [] args) throws ParseException, + MergeConfigurationException, UnknownHostException, MergerException, + java.text.ParseException, SailException, AccumuloException, + AccumuloSecurityException, InferenceEngineException, RepositoryException, + MalformedQueryException, UpdateExecutionException { + + final String log4jConfiguration = System.getProperties().getProperty("log4j.configuration"); + if (StringUtils.isNotBlank(log4jConfiguration)) { + final String parsedConfiguration = StringUtils.removeStart(log4jConfiguration, "file:"); + final File configFile = new File(parsedConfiguration); + if (configFile.exists()) { + DOMConfigurator.configure(parsedConfiguration); + } else { + BasicConfigurator.configure(); + } + } + + final MergeConfigurationCLI config = new MergeConfigurationCLI(args); + try { + configuration = config.createConfiguration(); + } catch (final MergeConfigurationException e) { + LOG.error("Configuration failed.", e); + } + + final boolean useTimeSync = configuration.getUseNtpServer(); + Optional<Long> offset = Optional.absent(); + if (useTimeSync) { + final String tomcat = configuration.getChildTomcatUrl(); + final String ntpHost = configuration.getNtpServerHost(); + try { + offset = Optional.<Long>fromNullable(TimeUtils.getNtpServerAndMachineTimeDifference(ntpHost, tomcat)); + } catch (final IOException e) { + LOG.error("Unable to get time difference between time server: " + ntpHost + " and the server: " + tomcat, e); + } + } + + final StatementStoreFactory storeFactory = new StatementStoreFactory(configuration); + try { + final RyaStatementStore parentStore = storeFactory.getParentStatementStore(); + final RyaStatementStore childStore = storeFactory.getChildStatementStore(); + + LOG.info("Starting Merge Tool"); + if(configuration.getParentDBType() == ACCUMULO && configuration.getChildDBType() == ACCUMULO) { + final AccumuloRyaStatementStore childAStore = (AccumuloRyaStatementStore) childStore; + final AccumuloRyaStatementStore parentAStore = (AccumuloRyaStatementStore) parentStore; + + //do map reduce merging. + //TODO: Run Merger + } else { + if(configuration.getMergePolicy() == TIMESTAMP) { + final TimestampPolicyMergeConfiguration timeConfig = (TimestampPolicyMergeConfiguration) configuration; + final Long timeOffset; + if (offset.isPresent()) { + timeOffset = offset.get(); + } else { + timeOffset = 0L; + } + final MemoryTimeMerger merger = new MemoryTimeMerger(parentStore, childStore, + new VisibilityStatementMerger(), timeConfig.getToolStartTime(), + configuration.getParentRyaInstanceName(), timeOffset); + merger.runJob(); + } + } + } catch (final Exception e) { + LOG.error("Something went wrong creating a Rya Statement Store connection.", e); + } + + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(final Thread thread, final Throwable throwable) { + LOG.error("Uncaught exception in " + thread.getName(), throwable); + } + }); + + LOG.info("Finished running Merge Tool"); + System.exit(1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/DateTimePickerDialog.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/DateTimePickerDialog.java b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/DateTimePickerDialog.java new file mode 100644 index 0000000..eaa8822 --- /dev/null +++ b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/DateTimePickerDialog.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.client.conf; + +import java.awt.GridBagConstraints; +import java.awt.GridBagLayout; +import java.awt.Insets; +import java.awt.event.ActionEvent; +import java.awt.event.ActionListener; +import java.util.Calendar; +import java.util.Date; + +import javax.swing.BorderFactory; +import javax.swing.JButton; +import javax.swing.JDialog; +import javax.swing.JLabel; +import javax.swing.JPanel; +import javax.swing.JSpinner; +import javax.swing.JSpinner.DateEditor; +import javax.swing.SpinnerDateModel; +import javax.swing.SpinnerModel; +import javax.swing.WindowConstants; + +import com.toedter.calendar.JCalendar; + +/** + * Dialog for picking date and time. + */ +public class DateTimePickerDialog extends JDialog { + private static final long serialVersionUID = 1L; + + private JCalendar dateChooser; + private JSpinner timeSpinner; + + private Date selectedDateTime; + private final JLabel label; + + + /** + * Creates a new instance of {@link DateTimePickerDialog}. + * @param title the title to display up top. + * @param message the message to display. + */ + public DateTimePickerDialog(final String title, final String message) { + this(null, title, message); + } + + /** + * Creates a new instance of {@link DateTimePickerDialog}. + * @param date the initial date to have the dialog show. + * @param title the title to display up top. + * @param message the message to display. + */ + public DateTimePickerDialog(final Date date, final String title, final String message) { + // Create a modal dialog + super((JDialog) null); + setTitle(title); + setModalityType(ModalityType.APPLICATION_MODAL); + setType(Type.NORMAL); + + setLayout(new GridBagLayout()); + setDefaultCloseOperation(WindowConstants.DISPOSE_ON_CLOSE); + + final JButton okButton = new JButton("OK"); + okButton.addActionListener (new ActionListener() { + @Override + public void actionPerformed(final ActionEvent event) { + selectedDateTime = findSelectedDateTime(); + + // Hide dialog + setVisible(false); + } + }); + + getRootPane().setDefaultButton(okButton); + + final JPanel dateTimePanel = buildDateTimePanel(date); + label = new JLabel (message); + label.setBorder(BorderFactory.createEtchedBorder()); + + final GridBagConstraints c = new GridBagConstraints(); + c.fill = GridBagConstraints.HORIZONTAL; + c.insets = new Insets(5, 5, 5, 5); + c.gridx = 0; + c.gridy = 0; + + add(dateTimePanel, c); + c.gridy++; + add(label, c); + c.anchor = GridBagConstraints.EAST; + c.fill = GridBagConstraints.NONE; + c.gridy++; + add(okButton, c); + + pack(); + } + + private JPanel buildDateTimePanel(final Date date) { + final JPanel datePanel = new JPanel(); + + dateChooser = new JCalendar(); + if (date != null) { + final Calendar calendar = Calendar.getInstance(); + calendar.setTime(date); + dateChooser.setCalendar(calendar); + } + + datePanel.add(dateChooser); + + final SpinnerModel model = new SpinnerDateModel(); + timeSpinner = new JSpinner(model); + final DateEditor editor = new DateEditor(timeSpinner, "HH:mm:ss"); + timeSpinner.setEditor(editor); + if (date != null) { + timeSpinner.setValue(date); + } + + datePanel.add(timeSpinner); + + return datePanel; + } + + private Date findSelectedDateTime() { + // Get the values from the date chooser + final int day = dateChooser.getDayChooser().getDay(); + final int month = dateChooser.getMonthChooser().getMonth(); + final int year = dateChooser.getYearChooser().getYear(); + + // Get the values from the time chooser + final Calendar timeCalendar = Calendar.getInstance(); + timeCalendar.setTime((Date) timeSpinner.getValue()); + final int hour = timeCalendar.get(Calendar.HOUR_OF_DAY); + final int minute = timeCalendar.get(Calendar.MINUTE); + final int second = timeCalendar.get(Calendar.SECOND); + + // Combine these values into a single date object + final Calendar newCalendar = Calendar.getInstance(); + newCalendar.set(Calendar.YEAR, year); + newCalendar.set(Calendar.MONTH, month); + newCalendar.set(Calendar.DATE, day); + newCalendar.set(Calendar.HOUR_OF_DAY, hour); + newCalendar.set(Calendar.MINUTE, minute); + newCalendar.set(Calendar.SECOND, second); + + final Date newDate = newCalendar.getTime(); + + return newDate; + } + + /** + * @return the selected date time. + */ + public Date getSelectedDateTime() { + return selectedDateTime; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/MergeConfigHadoopAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/MergeConfigHadoopAdapter.java b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/MergeConfigHadoopAdapter.java new file mode 100644 index 0000000..c51d637 --- /dev/null +++ b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/MergeConfigHadoopAdapter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.client.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.export.api.conf.MergeConfiguration; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; + +/** + * Adapts the {@link MergeConfiguration} to the hadoop {@link Configuration}. + */ +public class MergeConfigHadoopAdapter { + public static MongoDBRdfConfiguration getMongoConfiguration(final MergeConfiguration config) { + final MongoDBRdfConfiguration configuration = new MongoDBRdfConfiguration(); + configuration.setMongoInstance(config.getChildHostname()); + configuration.set(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT, config.getChildPort() + ""); + configuration.set(MongoDBRdfConfiguration.MONGO_DB_NAME, config.getChildRyaInstanceName()); + return configuration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/MergeConfigurationCLI.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/MergeConfigurationCLI.java b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/MergeConfigurationCLI.java new file mode 100644 index 0000000..f0c5136 --- /dev/null +++ b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/MergeConfigurationCLI.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.client.conf; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.export.MergePolicy.TIMESTAMP; + +import java.io.File; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.rya.export.AccumuloMergeToolConfiguration; +import org.apache.rya.export.DBType; +import org.apache.rya.export.InstanceType; +import org.apache.rya.export.MergePolicy; +import org.apache.rya.export.MergeToolConfiguration; +import org.apache.rya.export.TimestampMergePolicyConfiguration; +import org.apache.rya.export.api.conf.AccumuloMergeConfiguration; +import org.apache.rya.export.api.conf.ConfigurationAdapter; +import org.apache.rya.export.api.conf.MergeConfiguration; +import org.apache.rya.export.api.conf.MergeConfigurationException; +import org.apache.rya.export.api.conf.policy.TimestampPolicyMergeConfiguration; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Helper class for processing command line arguments for the Merge Tool. + */ +public class MergeConfigurationCLI { + private static final String DIALOG_TITLE = "Select a Start Time/Date"; + private static final String DIALOG_MESSAGE = + "<html>Choose the time of the data to merge.<br>Only data modified AFTER the selected time will be merged.</html>"; + + private static final Option CONFIG_OPTION = new Option("c", true, "Defines the configuration file for the Merge Tool to use."); + private static final Option TIME_OPTION = new Option("t", true, "Defines the timestamp from which to filter RyaStatements when merging."); + private static final Option PARENT_HOST_OPTION = new Option("a", "pHost", true, "Defines the hostname of the parent db to connect to."); + private static final Option PARENT_USER_OPTION = new Option("b", "pUser", true, "Defines the username to connect to the parent DB."); + private static final Option PARENT_PSWD_OPTION = new Option("d", "pPswd", true, "Defines the password to connect to the parent DB."); + private static final Option PARENT_RYA_OPTION = new Option("e", "pRya", true, "Defines the rya instance name of the parent DB."); + private static final Option PARENT_PREFIX_OPTION = new Option("f", "pPrefix", true, "Defines the table prefix of the parent DB."); + private static final Option PARENT_TOMCAT_OPTION = new Option("g", "pTomcat", true, "Defines the location of Tomcat for the parent DB."); + private static final Option PARENT_DB_OPTION = new Option("h", "pDB", true, "Defines the type of database the parent is."); + private static final Option PARENT_PORT_OPTION = new Option("i", "pPort", true, "Defines the port of the parent DB to connect to."); + private static final Option PARENT_ACCUMULO_ZOOKEEPERS_OPTION = new Option("j", "paZookeepers", true, "Defines the location of the zookeepers."); + private static final Option PARENT_ACCUMULO_AUTHS_OPTION = new Option("k", "paAuths", true, "Defines the authorization level of the user."); + private static final Option PARENT_ACCUMULO_TYPE_OPTION = new Option("l", "paType", true, "Defines the type of accumulo to connect to."); + private static final Option CHILD_HOST_OPTION = new Option("m", "cHost", true, "Defines the hostname of the child db to connect to."); + private static final Option CHILD_USER_OPTION = new Option("n", "cUser", true, "Defines the username to connect to the child DB."); + private static final Option CHILD_PSWD_OPTION = new Option("o", "cPswd", true, "Defines the password to connect to the child DB."); + private static final Option CHILD_RYA_OPTION = new Option("p", "cRya", true, "Defines the rya instance name of the child DB."); + private static final Option CHILD_PREFIX_OPTION = new Option("q", "cPrefix", true, "Defines the table prefix of the child DB."); + private static final Option CHILD_TOMCAT_OPTION = new Option("r", "cTomcat", true, "Defines the location of Tomcat for the child DB."); + private static final Option CHILD_DB_OPTION = new Option("s", "cDB", true, "Defines the type of database the child is."); + private static final Option CHILD_PORT_OPTION = new Option("u", "cPort", true, "Defines the port of the child DB to connect to."); + private static final Option CHILD_ACCUMULO_ZOOKEEPERS_OPTION = new Option("v", "caZookeepers", true, "Defines the location of the zookeepers."); + private static final Option CHILD_ACCUMULO_AUTHS_OPTION = new Option("w", "caAuths", true, "Defines the authorization level of the user."); + private static final Option CHILD_ACCUMULO_TYPE_OPTION = new Option("x", "caType", true, "Defines the type of accumulo to connect to."); + private static final Option MERGE_OPTION = new Option("y", "merge", true, "Defines the type of merging that should occur."); + private static final Option NTP_OPTION = new Option("z", "useNTP", true, "Defines if NTP should be used to synch time."); + public static final DateFormat DATE_FORMAT = new SimpleDateFormat("MMM ddd yyy HH:mm:ss"); + private final CommandLine cmd; + + private MergeConfiguration configuration; + /** + * + * @param args + * @throws MergeConfigurationException + */ + public MergeConfigurationCLI(final String[] args) throws MergeConfigurationException { + checkNotNull(args); + + final Options cliOptions = getOptions(); + final CommandLineParser parser = new BasicParser(); + try { + cmd = parser.parse(cliOptions, args); + } catch (final ParseException pe) { + throw new MergeConfigurationException("Improperly formatted options.", pe); + } + } + + /** + * @return The valid {@link Options} + */ + @VisibleForTesting + public static Options getOptions() { + final Options cliOptions = new Options() + .addOption(TIME_OPTION) + .addOption(CONFIG_OPTION) + .addOption(PARENT_DB_OPTION) + .addOption(PARENT_HOST_OPTION) + .addOption(PARENT_PORT_OPTION) + .addOption(PARENT_PREFIX_OPTION) + .addOption(PARENT_PSWD_OPTION) + .addOption(PARENT_RYA_OPTION) + .addOption(PARENT_TOMCAT_OPTION) + .addOption(PARENT_USER_OPTION) + .addOption(PARENT_ACCUMULO_AUTHS_OPTION) + .addOption(PARENT_ACCUMULO_TYPE_OPTION) + .addOption(PARENT_ACCUMULO_ZOOKEEPERS_OPTION) + .addOption(CHILD_DB_OPTION) + .addOption(CHILD_HOST_OPTION) + .addOption(CHILD_PORT_OPTION) + .addOption(CHILD_PREFIX_OPTION) + .addOption(CHILD_PSWD_OPTION) + .addOption(CHILD_RYA_OPTION) + .addOption(CHILD_TOMCAT_OPTION) + .addOption(CHILD_USER_OPTION) + .addOption(CHILD_ACCUMULO_AUTHS_OPTION) + .addOption(CHILD_ACCUMULO_TYPE_OPTION) + .addOption(CHILD_ACCUMULO_ZOOKEEPERS_OPTION) + .addOption(MERGE_OPTION) + .addOption(NTP_OPTION); + return cliOptions; + } + + public static MergeToolConfiguration createConfigurationFromFile(final File configFile) throws MergeConfigurationException { + try { + final JAXBContext context = JAXBContext.newInstance(DBType.class, MergeToolConfiguration.class, AccumuloMergeToolConfiguration.class, TimestampMergePolicyConfiguration.class, MergePolicy.class, InstanceType.class); + final Unmarshaller unmarshaller = context.createUnmarshaller(); + return (MergeToolConfiguration) unmarshaller.unmarshal(configFile); + } catch (final JAXBException | IllegalArgumentException JAXBe) { + throw new MergeConfigurationException("Failed to create a config based on the provided configuration.", JAXBe); + } + } + + public Date getRyaStatementMergeTime() throws MergeConfigurationException { + final Date time; + if(cmd.hasOption(TIME_OPTION.getOpt())) { + final String dateStr = cmd.getOptionValue(TIME_OPTION.getOpt()); + try { + time = DATE_FORMAT.parse(dateStr); + } catch (final java.text.ParseException e) { + throw new MergeConfigurationException("The provided timestamp was not formatted correctly.", e); + } + } else { + final DateTimePickerDialog dialog = new DateTimePickerDialog(DIALOG_TITLE, DIALOG_MESSAGE); + dialog.setVisible(true); + time = dialog.getSelectedDateTime(); + } + return time; + } + + /** + * Attempts to create the {@link MergeConfiguration} based on the provided + * configuration file. + * @return The {@link MergeConfiguration} created. + * @throws MergeConfigurationException - Thrown when the provided file is + * not formatted properly for a {@link MergeConfiguration}. + */ + public MergeConfiguration createConfiguration() throws MergeConfigurationException { + if(configuration == null) { + //If the config option is present, ignore all other options. + if(cmd.hasOption(CONFIG_OPTION.getOpt())) { + final File xml = new File(cmd.getOptionValue(CONFIG_OPTION.getOpt())); + final ConfigurationAdapter adapter = new ConfigurationAdapter(); + configuration = adapter.createConfig(createConfigurationFromFile(xml)); + } else { + final DBType parentType = DBType.fromValue(cmd.getOptionValue(PARENT_DB_OPTION.getLongOpt())); + final DBType childType = DBType.fromValue(cmd.getOptionValue(CHILD_DB_OPTION.getLongOpt())); + final MergePolicy mergePolicy = MergePolicy.fromValue(cmd.getOptionValue(MERGE_OPTION.getLongOpt())); + MergeConfiguration.Builder builder = new MergeConfiguration.Builder() + .setParentHostname(cmd.getOptionValue(PARENT_HOST_OPTION.getLongOpt())) + .setParentUsername(cmd.getOptionValue(PARENT_USER_OPTION.getLongOpt())) + .setParentPassword(cmd.getOptionValue(PARENT_PSWD_OPTION.getLongOpt())) + .setParentRyaInstanceName(cmd.getOptionValue(PARENT_RYA_OPTION.getLongOpt())) + .setParentTablePrefix(cmd.getOptionValue(PARENT_PREFIX_OPTION.getLongOpt())) + .setParentTomcatUrl(cmd.getOptionValue(PARENT_TOMCAT_OPTION.getLongOpt())) + .setParentDBType(parentType) + .setParentPort(Integer.parseInt(cmd.getOptionValue(PARENT_PORT_OPTION.getLongOpt()))) + .setChildHostname(cmd.getOptionValue(CHILD_HOST_OPTION.getLongOpt())) + .setChildUsername(cmd.getOptionValue(CHILD_USER_OPTION.getLongOpt())) + .setChildPassword(cmd.getOptionValue(CHILD_PSWD_OPTION.getLongOpt())) + .setChildRyaInstanceName(cmd.getOptionValue(CHILD_RYA_OPTION.getLongOpt())) + .setChildTablePrefix(cmd.getOptionValue(CHILD_PREFIX_OPTION.getLongOpt())) + .setChildTomcatUrl(cmd.getOptionValue(CHILD_TOMCAT_OPTION.getLongOpt())) + .setChildDBType(childType) + .setChildPort(Integer.parseInt(cmd.getOptionValue(CHILD_PORT_OPTION.getLongOpt()))) + .setMergePolicy(mergePolicy); + if (mergePolicy == TIMESTAMP) { + builder = new TimestampPolicyMergeConfiguration.TimestampPolicyBuilder(builder) + .setToolStartTime(cmd.getOptionValue(TIME_OPTION.getLongOpt())); + } + if (parentType == DBType.ACCUMULO) { + builder = new AccumuloMergeConfiguration.AccumuloBuilder(builder) + .setParentZookeepers(cmd.getOptionValue(PARENT_ACCUMULO_ZOOKEEPERS_OPTION.getLongOpt())) + .setParentAuths(cmd.getOptionValue(PARENT_ACCUMULO_AUTHS_OPTION.getLongOpt())) + .setParentInstanceType(InstanceType.fromValue(cmd.getOptionValue(PARENT_ACCUMULO_TYPE_OPTION.getLongOpt()))); + } + if (childType == DBType.ACCUMULO) { + builder = new AccumuloMergeConfiguration.AccumuloBuilder(builder) + .setChildZookeepers(cmd.getOptionValue(CHILD_ACCUMULO_ZOOKEEPERS_OPTION.getLongOpt())) + .setChildAuths(cmd.getOptionValue(CHILD_ACCUMULO_AUTHS_OPTION.getLongOpt())) + .setChildInstanceType(InstanceType.fromValue(cmd.getOptionValue(CHILD_ACCUMULO_TYPE_OPTION.getLongOpt()))); + } + configuration = builder.build(); + } + } + return configuration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/TimeUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/TimeUtils.java b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/TimeUtils.java new file mode 100644 index 0000000..596e98d --- /dev/null +++ b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/conf/TimeUtils.java @@ -0,0 +1,348 @@ +package org.apache.rya.export.client.conf; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.net.ntp.NTPUDPClient; +import org.apache.commons.net.ntp.TimeInfo; +import org.apache.log4j.Logger; +import org.codehaus.plexus.util.StringUtils; +import org.mortbay.jetty.HttpMethods; + +import com.google.common.net.HttpHeaders; + +/** + * Utility methods for time. + */ +public final class TimeUtils { + private static final Logger log = Logger.getLogger(TimeUtils.class); + + /** + * The default host name of the time server to use. + * List of time servers: http://tf.nist.gov/tf-cgi/servers.cgi + * Do not query time server more than once every 4 seconds. + */ + public static final String DEFAULT_TIME_SERVER_HOST = "time.nist.gov"; + + private static final int NTP_SERVER_TIMEOUT_MS = 15000; + + /** + * Queries the default NTP Server for the time. + * Do not query time server more than once every 4 seconds. + * @return the NTP server {@link Date} or {@code null}. + * @throws IOException + */ + public static Date getDefaultNtpServerDate() throws IOException { + return getNtpServerDate(DEFAULT_TIME_SERVER_HOST); + } + + /** + * Queries the specified NTP Server for the time. + * Do not query time server more than once every 4 seconds. + * @param timeServerHost the time server host name. + * @return the NTP server {@link Date} or {@code null}. + * @throws IOException + */ + public static Date getNtpServerDate(final String timeServerHost) throws IOException { + try { + TimeInfo timeInfo = null; + final NTPUDPClient timeClient = new NTPUDPClient(); + timeClient.setDefaultTimeout(NTP_SERVER_TIMEOUT_MS); + final InetAddress inetAddress = InetAddress.getByName(timeServerHost); + if (inetAddress != null) { + timeInfo = timeClient.getTime(inetAddress); + if (timeInfo != null) { + // TODO: which time to use? + final long serverTime = timeInfo.getMessage().getTransmitTimeStamp().getTime(); + //long serverTime = timeInfo.getReturnTime(); + final Date ntpDate = new Date(serverTime); + return ntpDate; + } + } + } catch (final IOException e) { + throw new IOException("Unable to get NTP server time.", e); + } + return null; + } + + /** + * Gets the remote machine's system time by checking the DATE field in the header + * from a HTTP HEAD method response. + * @param urlString the URL string of the remote machine's web server to connect to. + * @return the remote machine's system {@link Date} or {@code null}. + * @throws IOException + * @throws ParseException + */ + public static Date getRemoteMachineDate(final String urlString) throws IOException, ParseException { + Date remoteDate = null; + HttpURLConnection conn = null; + try { + final URL url = new URL(urlString); + + // Set up the initial connection + conn = (HttpURLConnection)url.openConnection(); + // Use HEAD instead of GET so content isn't returned. + conn.setRequestMethod(HttpMethods.HEAD); + conn.setDoOutput(false); + conn.setReadTimeout(10000); + + conn.connect(); + + final Map<String, List<String>> header = conn.getHeaderFields(); + for (final String key : header.keySet()) { + if (key != null && HttpHeaders.DATE.equals(key)) { + final List<String> data = header.get(key); + final String dateString = data.get(0); + final SimpleDateFormat sdf = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss Z"); + remoteDate = sdf.parse(dateString); + break; + } + } + } finally { + // Close the connection + conn.disconnect(); + } + + return remoteDate; + } + + /** + * Gets the time difference between the 2 specified times from the NTP server time and the machine system time. + * @param ntpDate the {@link Date} from the NTP server host. + * @param machineDate the {@link Date} from the machine (either local or remote). + * @param isMachineLocal {@code true} if the {@code machineDate} from a local machine. {@code false} + * if it's from a remote machine. + * @return the difference between the NTP server time and the machine's system time. A positive value + * indicates that the machine's system time is ahead of the time server. A negative value indicates that + * the machine's system time is behind of the time server. + */ + public static Long getTimeDifference(final Date ntpDate, final Date machineDate, final boolean isMachineLocal) { + Long diff = null; + if (ntpDate != null && machineDate != null) { + log.info("NTP Server Time: " + ntpDate); + final String machineLabel = isMachineLocal ? "Local" : "Remote"; + log.info(machineLabel + " Machine Time: " + machineDate); + diff = machineDate.getTime() - ntpDate.getTime(); + + final boolean isAhead = diff > 0; + final String durationBreakdown = TimeUtils.getDurationBreakdown(diff, false); + log.info(machineLabel + " Machine time is " + (isAhead ? "ahead of" : "behind") + " NTP server time by " + durationBreakdown + "."); + } + + return diff; + } + + /** + * Gets the time difference between the NTP server and the local machine system time. + * @param timeServerHost the time server host name. + * @return the difference between the NTP server time and the local machine's system time. A positive value + * indicates that the local machine's system time is ahead of the time server. A negative value indicates that + * the local machine's system time is behind of the time server. + * @throws IOException + */ + public static Long getNtpServerAndLocalMachineTimeDifference(final String timeServerHost) throws IOException { + log.info("Getting NTP Server time from " + timeServerHost + "..."); + final Date ntpDate = getNtpServerDate(timeServerHost); + Long diff = null; + if (ntpDate != null) { + log.info("Getting Local Machine time..."); + final Date machineDate = new Date(); + + diff = getTimeDifference(ntpDate, machineDate, true); + } + + return diff; + } + + /** + * Gets the time difference between the NTP server and the remote machine system time. + * @param timeServerHost the time server host name. + * @param remoteMachineUrlString the URL string of the remote machine's web server to connect to. + * @return the difference between the NTP server time and the remote machine's system time. A positive value + * indicates that the remote machine's system time is ahead the time server. A negative value indicates that + * the remote machine's system time is behind the time server. + * @throws ParseException + * @throws IOException + */ + public static Long getNtpServerAndRemoteMachineTimeDifference(final String timeServerHost, final String remoteMachineUrlString) throws IOException, ParseException { + log.info("Getting NTP Server time from " + timeServerHost + "..."); + final Date ntpDate = getNtpServerDate(timeServerHost); + Long diff = null; + if (ntpDate != null) { + log.info("Getting Remote Machine time from " + remoteMachineUrlString + "..."); + final Date machineDate = getRemoteMachineDate(remoteMachineUrlString); + + diff = getTimeDifference(ntpDate, machineDate, false); + } + + return diff; + } + + /** + * Gets the time difference between the NTP server and the machine system time (either locally or remotely depending on the URL). + * @param timeServerHost the time server host name. + * @param machineUrlString the URL string of the machine's web server to connect to. The machine might be + * local or remote. + * @return the difference between the NTP server time and the machine's system time. A positive value + * indicates that the machine's system time is ahead of the time server. A negative value indicates that + * the machine's system time is behind the time server. + * @throws ParseException + * @throws IOException + */ + public static Long getNtpServerAndMachineTimeDifference(final String timeServerHost, final String machineUrlString) throws IOException, ParseException { + final boolean isUrlLocalMachine = isUrlLocalMachine(machineUrlString); + + Long machineTimeOffset; + if (isUrlLocalMachine) { + machineTimeOffset = getNtpServerAndLocalMachineTimeDifference(timeServerHost); + } else { + machineTimeOffset = getNtpServerAndRemoteMachineTimeDifference(timeServerHost, machineUrlString); + } + + return machineTimeOffset; + } + + /** + * Gets the machine system time (either locally or remotely depending on the URL). + * @param urlString the URL string of the machine to check. + * @return the machine's system time. + * @throws IOException + * @throws ParseException + */ + public static Date getMachineDate(final String urlString) throws IOException, ParseException { + final boolean isMachineLocal = isUrlLocalMachine(urlString); + + Date machineDate; + if (isMachineLocal) { + // Get local system machine time + machineDate = new Date(); + } else { + // Get remote machine time from HTTP HEAD response. Check hosted server web page on machine for time. + machineDate = getRemoteMachineDate(urlString); + } + + return machineDate; + } + + /** + * Checks to see if the URL provided is hosted on the local machine or not. + * @param urlString the URL string to check. + * @return {@code true} if the URL is hosted on the local machine. {@code false} + * if it's on a remote machine. + * @throws UnknownHostException + * @throws MalformedURLException + */ + public static boolean isUrlLocalMachine(final String urlString) throws UnknownHostException, MalformedURLException { + final String localAddress = InetAddress.getLocalHost().getHostAddress(); + final String requestAddress = InetAddress.getByName(new URL(urlString).getHost()).getHostAddress(); + return localAddress != null && requestAddress != null && localAddress.equals(requestAddress); + } + + /** + * Convert a millisecond duration to a string format. + * @param durationMs A duration to convert to a string form. + * @return A string of the form "X Days Y Hours Z Minutes A Seconds B Milliseconds". + */ + public static String getDurationBreakdown(final long durationMs) { + return getDurationBreakdown(durationMs, true); + } + + /** + * Convert a millisecond duration to a string format. + * @param durationMs A duration to convert to a string form. + * @param showSign {@code true} to show if the duration is positive or negative. {@code false} + * to not display the sign. + * @return A string of the form "X Days Y Hours Z Minutes A Seconds B Milliseconds". + */ + public static String getDurationBreakdown(final long durationMs, final boolean showSign) { + long tempDurationMs = Math.abs(durationMs); + + final long days = TimeUnit.MILLISECONDS.toDays(tempDurationMs); + tempDurationMs -= TimeUnit.DAYS.toMillis(days); + final long hours = TimeUnit.MILLISECONDS.toHours(tempDurationMs); + tempDurationMs -= TimeUnit.HOURS.toMillis(hours); + final long minutes = TimeUnit.MILLISECONDS.toMinutes(tempDurationMs); + tempDurationMs -= TimeUnit.MINUTES.toMillis(minutes); + final long seconds = TimeUnit.MILLISECONDS.toSeconds(tempDurationMs); + tempDurationMs -= TimeUnit.SECONDS.toMillis(seconds); + final long milliseconds = TimeUnit.MILLISECONDS.toMillis(tempDurationMs); + + final StringBuilder sb = new StringBuilder(); + if (tempDurationMs != 0 && showSign) { + sb.append(tempDurationMs > 0 ? "+" : "-"); + } + if (days > 0) { + sb.append(days); + sb.append(days == 1 ? " Day " : " Days "); + } + if (hours > 0) { + sb.append(hours); + sb.append(hours == 1 ? " Hour " : " Hours "); + } + if (minutes > 0) { + sb.append(minutes); + sb.append(minutes == 1 ? " Minute " : " Minutes "); + } + if (seconds > 0) { + sb.append(seconds); + sb.append(seconds == 1 ? " Second " : " Seconds " ); + } + if (milliseconds > 0 || (!showSign && sb.length() == 0) || (showSign && sb.length() == 1)) { + // At least show the milliseconds if nothing else has been shown so far + sb.append(milliseconds); + sb.append(milliseconds == 1 ? " Millisecond" : " Milliseconds"); + } + + return StringUtils.trim(sb.toString()); + } + + /** + * Checks if a date is before another date or if they are equal. + * @param date1 the first {@link Date}. + * @param date2 the second {@link Date}. + * @return {@code true} if {@code date1} is before or equal to {@code date2}. {@code false} otherwise. + */ + public static boolean dateBeforeInclusive(final Date date1, final Date date2) { + return !date1.after(date2); + } + + /** + * Checks if a date is after another date or if they are equal. + * @param date1 the first {@link Date}. + * @param date2 the second {@link Date}. + * @return {@code true} if {@code date1} is after or equal to {@code date2}. {@code false} otherwise. + */ + public static boolean dateAfterInclusive(final Date date1, final Date date2) { + return !date1.before(date2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/MemoryTimeMerger.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/MemoryTimeMerger.java b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/MemoryTimeMerger.java new file mode 100644 index 0000000..dc0a148 --- /dev/null +++ b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/MemoryTimeMerger.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.client.merge; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Date; +import java.util.Iterator; +import java.util.Optional; + +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.export.api.Merger; +import org.apache.rya.export.api.StatementMerger; +import org.apache.rya.export.api.metadata.MergeParentMetadata; +import org.apache.rya.export.api.metadata.ParentMetadataExistsException; +import org.apache.rya.export.api.store.AddStatementException; +import org.apache.rya.export.api.store.ContainsStatementException; +import org.apache.rya.export.api.store.FetchStatementException; +import org.apache.rya.export.api.store.RemoveStatementException; +import org.apache.rya.export.api.store.RyaStatementStore; + +/** + * An in memory {@link Merger}. Merges {@link RyaStatement}s from a parent + * to a child. The statements merged will be any that have a timestamp after + * the provided time. If there are any conflicting statements, the provided + * {@link StatementMerger} will merge the statements and produce the desired + * {@link RyaStatement}. + */ +public class MemoryTimeMerger implements Merger { + private static final Logger LOG = Logger.getLogger(MemoryTimeMerger.class); + + private final RyaStatementStore parentStore; + private final RyaStatementStore childStore; + private final StatementMerger statementMerger; + private final Date timestamp; + private final String ryaInstanceName; + private final Long timeOffset; + + /** + * Creates a new {@link MemoryTimeMerger} to merge the statements from the parent to a child. + * @param parentStore + * @param childStore + * @param childMetadata + * @param parentMetadata + * @param statementMerger + * @param timestamp - The timestamp from which all parent statements will be merged into the child. + */ + public MemoryTimeMerger(final RyaStatementStore parentStore, final RyaStatementStore childStore, + final StatementMerger statementMerger, final Date timestamp, final String ryaInstanceName, + final Long timeOffset) { + this.parentStore = checkNotNull(parentStore); + this.childStore = checkNotNull(childStore); + this.statementMerger = checkNotNull(statementMerger); + this.timestamp = checkNotNull(timestamp); + this.ryaInstanceName = checkNotNull(ryaInstanceName); + this.timeOffset = checkNotNull(timeOffset); + } + + @Override + public void runJob() { + final Optional<MergeParentMetadata> metadata = parentStore.getParentMetadata(); + + //check the parent for a parent metadata repo + if(metadata.isPresent()) { + LOG.info("Merging statements..."); + final MergeParentMetadata parentMetadata = metadata.get(); + if(parentMetadata.getRyaInstanceName().equals(ryaInstanceName)) { + try { + importStatements(parentMetadata); + } catch (AddStatementException | ContainsStatementException | RemoveStatementException | FetchStatementException e) { + LOG.error("Failed to import statements.", e); + } + } + } else { + try { + LOG.info("Cloning statements..."); + export(); + } catch (final ParentMetadataExistsException | FetchStatementException e) { + LOG.error("Failed to export statements.", e); + } + } + } + + /** + * Exports all statements after the provided timestamp. + * @throws ParentMetadataExistsException - + * @throws FetchStatementException + */ + private void export() throws ParentMetadataExistsException, FetchStatementException { + LOG.info("Creating parent metadata in the child."); + //setup parent metadata repo in the child + final MergeParentMetadata metadata = new MergeParentMetadata.Builder() + .setRyaInstanceName(ryaInstanceName) + .setTimestamp(new Date()) + .setParentTimeOffset(timeOffset) + .setFilterTimestmap(timestamp) + .build(); + childStore.setParentMetadata(metadata); + + //fetch all statements after timestamp from the parent + final Iterator<RyaStatement> statements = parentStore.fetchStatements(); + LOG.info("Exporting statements."); + while(statements.hasNext()) { + System.out.print("."); + final RyaStatement statement = statements.next(); + try { + childStore.addStatement(statement); + } catch (final AddStatementException e) { + LOG.error("Failed to add statement: " + statement + " to the statement store.", e); + } + } + } + + private void importStatements(final MergeParentMetadata metadata) throws AddStatementException, ContainsStatementException, RemoveStatementException, FetchStatementException { + LOG.info("Importing statements."); + final Iterator<RyaStatement> parentStatements = parentStore.fetchStatements(); + final Iterator<RyaStatement> childStatements = childStore.fetchStatements(); + //statements are in order by timestamp. + + //Remove statements that were removed in the child. + //after the timestamp has passed, there is no need to keep checking the parent + while(childStatements.hasNext()) { + final RyaStatement statement = childStatements.next(); + if(statement.getTimestamp() > metadata.getTimestamp().getTime()) { + break; + } + if(!parentStore.containsStatement(statement)) { + System.out.println(statement.toString()); + childStore.removeStatement(statement); + } + } + + long curTime = -1L; + //Add all of the child statements that are not in the parent + while(parentStatements.hasNext()) { + final RyaStatement statement = parentStatements.next(); + curTime = statement.getTimestamp(); + if(!childStore.containsStatement(statement)) { + statement.setTimestamp(statement.getTimestamp() - timeOffset); + childStore.addStatement(statement); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/StatementStoreFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/StatementStoreFactory.java b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/StatementStoreFactory.java new file mode 100644 index 0000000..c96a2b6 --- /dev/null +++ b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/StatementStoreFactory.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.client.merge; + +import static java.util.Objects.requireNonNull; + +import java.util.Date; + +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.export.DBType; +import org.apache.rya.export.InstanceType; +import org.apache.rya.export.MergePolicy; +import org.apache.rya.export.accumulo.AccumuloRyaStatementStore; +import org.apache.rya.export.accumulo.policy.TimestampPolicyAccumuloRyaStatementStore; +import org.apache.rya.export.accumulo.util.AccumuloInstanceDriver; +import org.apache.rya.export.api.conf.AccumuloMergeConfiguration; +import org.apache.rya.export.api.conf.MergeConfiguration; +import org.apache.rya.export.api.conf.policy.TimestampPolicyMergeConfiguration; +import org.apache.rya.export.api.store.RyaStatementStore; +import org.apache.rya.export.client.conf.MergeConfigHadoopAdapter; +import org.apache.rya.export.mongo.MongoRyaStatementStore; +import org.apache.rya.export.mongo.policy.TimestampPolicyMongoRyaStatementStore; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.MongoDBRyaDAO; + +import com.mongodb.MongoClient; + +/** + * Factory for creating {@link RyaStatementStore}s based on the {@link MergeConfiguration}. + */ +public class StatementStoreFactory { + private final MergeConfiguration configuration; + + public StatementStoreFactory(final MergeConfiguration configuration) { + this.configuration = requireNonNull(configuration); + } + + /** + * Builds and retrieves the Parent {@link RyaStatementStore}. + * @return The created {@link RyaStatementStore} that connects to the Parent rya. + * @throws Exception - Something went wrong creating the {@link RyaStatementStore}. + */ + public RyaStatementStore getParentStatementStore() throws Exception { + final DBType dbType = configuration.getParentDBType(); + final String ryaInstanceName = configuration.getParentRyaInstanceName(); + RyaStatementStore store = getBaseStatementStore(dbType, + configuration.getParentHostname(), + configuration.getParentPort(), + ryaInstanceName, + configuration.getParentTablePrefix(), + configuration, true); + store = getMergePolicyStatementStore(store, + configuration.getMergePolicy(), + ryaInstanceName, + dbType); + return store; + } + + public RyaStatementStore getChildStatementStore() throws Exception { + final RyaStatementStore store = getBaseStatementStore(configuration.getChildDBType(), + configuration.getChildHostname(), + configuration.getChildPort(), + configuration.getChildRyaInstanceName(), + configuration.getChildTablePrefix(), + configuration, false); + return store; + } + + /** + * @param isParent + * @param config + * These parameters are hacks until the Accumulo DAO only accepts a connector. + * Once that happens this will be much, much cleaner, and make the {@link AccumuloInstanceDriver} + * obsolete. + * @throws Exception + */ + private RyaStatementStore getBaseStatementStore(final DBType dbType, + final String hostname, final int port, final String ryaInstancename, + final String tablePrefix, final MergeConfiguration config, final boolean isParent) throws Exception { + RyaStatementStore store; + if(dbType == DBType.MONGO) { + store = getBaseMongoStore(hostname, port, ryaInstancename); + } else { + final AccumuloMergeConfiguration aConfig = (AccumuloMergeConfiguration) config; + final InstanceType type = isParent ? aConfig.getParentInstanceType() : aConfig.getChildInstanceType(); + store = getBaseAccumuloStore(ryaInstancename, type, isParent, ryaInstancename, tablePrefix, tablePrefix, tablePrefix, tablePrefix); + } + return store; + } + + private RyaStatementStore getMergePolicyStatementStore(final RyaStatementStore store, final MergePolicy policy, final String ryaInstanceName, final DBType dbType) { + RyaStatementStore policyStore = null; + if(policy == MergePolicy.TIMESTAMP) { + final TimestampPolicyMergeConfiguration timeConfig = (TimestampPolicyMergeConfiguration) configuration; + final Date timestamp = timeConfig.getToolStartTime(); + if(dbType == DBType.MONGO) { + policyStore = new TimestampPolicyMongoRyaStatementStore((MongoRyaStatementStore) store, timestamp, ryaInstanceName); + } else { + policyStore = new TimestampPolicyAccumuloRyaStatementStore((AccumuloRyaStatementStore) store, timestamp); + } + } + return policyStore == null ? store : policyStore; + } + + private MongoRyaStatementStore getBaseMongoStore(final String hostname, final int port, final String ryaInstanceName) throws RyaDAOException { + final MongoClient client = new MongoClient(hostname, port); + final MongoDBRyaDAO dao = new MongoDBRyaDAO(new MongoDBRdfConfiguration(MergeConfigHadoopAdapter.getMongoConfiguration(configuration)), client); + return new MongoRyaStatementStore(client, ryaInstanceName, dao); + } + + private AccumuloRyaStatementStore getBaseAccumuloStore(final String ryaInstanceName, + final InstanceType type, final boolean isParent, + final String username, final String password, final String tablePrefix, + final String auths, final String zookeepers) throws Exception { + final AccumuloInstanceDriver aInstance = new AccumuloInstanceDriver( + ryaInstanceName+"_driver", type, true, false, isParent, username, + password, ryaInstanceName, tablePrefix, auths, zookeepers); + aInstance.setUp(); + final AccumuloRyaDAO dao = aInstance.getDao(); + return new AccumuloRyaStatementStore(dao, tablePrefix, ryaInstanceName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java new file mode 100644 index 0000000..f3d523f --- /dev/null +++ b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.client.merge; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.export.api.MergerException; +import org.apache.rya.export.api.StatementMerger; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; + +/** + * Merges two statements together at the visibility. Two statements can be + * merged if the subject, predicate, and object are the same. A merged statement + * has an unchanged subject, predicate, and object but a joined visiblity. + * The visibilities are joined with a logical AND, so a statement with + * visiblity 'A' and another statement with visibility 'B' will be merged + * to have visibility 'A&B' + */ +public class VisibilityStatementMerger implements StatementMerger { + @Override + public Optional<RyaStatement> merge(final Optional<RyaStatement> parent, final Optional<RyaStatement> child) + throws MergerException { + if(parent.isPresent()) { + final RyaStatement parentStatement = parent.get(); + if(child.isPresent()) { + final RyaStatement childStatement = child.get(); + final String pVis = new String(parentStatement.getColumnVisibility()); + final String cVis = new String(childStatement.getColumnVisibility()); + String visibility = ""; + final Joiner join = Joiner.on(")&("); + if(pVis.isEmpty() || cVis.isEmpty()) { + visibility = (pVis + cVis).trim(); + } else { + visibility = "(" + join.join(pVis, cVis) + ")"; + } + parentStatement.setColumnVisibility(visibility.getBytes()); + return Optional.of(parentStatement); + } + return parent; + } else if(child.isPresent()) { + return child; + } + return Optional.absent(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.integration/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.integration/pom.xml b/extras/rya.export/export.integration/pom.xml index 6f64354..d5c273a 100644 --- a/extras/rya.export/export.integration/pom.xml +++ b/extras/rya.export/export.integration/pom.xml @@ -25,7 +25,7 @@ under the License. <parent> <groupId>org.apache.rya</groupId> <artifactId>rya.export.parent</artifactId> - <version>3.2.10-SNAPSHOT</version> + <version>3.2.11-incubating-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> @@ -39,26 +39,21 @@ under the License. <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.api</artifactId> - <version>3.2.10-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.export.api</artifactId> - <version>3.2.10-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.export.client</artifactId> - <version>3.2.10-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <!-- Testing dependencies. --> <dependency> - <groupId>io.fluo</groupId> - <artifactId>fluo-mini</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.integration/src/test/java/org/apache/rya/indexing/export/ITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.integration/src/test/java/org/apache/rya/indexing/export/ITBase.java b/extras/rya.export/export.integration/src/test/java/org/apache/rya/indexing/export/ITBase.java new file mode 100644 index 0000000..e7f020d --- /dev/null +++ b/extras/rya.export/export.integration/src/test/java/org/apache/rya/indexing/export/ITBase.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.export; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.log4j.Logger; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaStatement.RyaStatementBuilder; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.junit.AfterClass; +import org.openrdf.model.Statement; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; + +import com.mongodb.MongoClient; +import com.mongodb.MongoException; + +import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory; + +/** + * Integration tests that ensure the import/export process runs correctly. + * <p> + * This class is being ignored because it doesn't contain any unit tests. + */ +public abstract class ITBase { + private static final Logger log = Logger.getLogger(ITBase.class); + + protected static final String RYA_TABLE_PREFIX = "demo_"; + + protected static final String USER = "root"; + protected static final String PASSWORD = "password"; + + protected static final String MONGO_USER = "testUser"; + protected static final String MONGO_PASSWORD = "testPSWD"; + + // Rya data store and connections. + protected static List<RyaSailRepository> ryaRepos = new ArrayList<>(); + protected static List<RepositoryConnection> ryaConns = new ArrayList<>(); + + // Rya mongo configs + protected static Map<MongoClient, MongoDBRdfConfiguration> configs = new HashMap<>(); + + // Test Mongos + protected static List<MongoClient> clients = new ArrayList<>(); + + /** + * @return A new {@link MongoClient}. Note: This does not have RYA installed. + * @throws MongoException + * @throws InferenceEngineException + * @throws RyaDAOException + * @throws AccumuloSecurityException + * @throws AccumuloException + * @throws RepositoryException + * @throws NumberFormatException + * @throws IOException + * @throws SailException + */ + public static MongoClient getNewMongoResources(final String ryaInstanceName) throws MongoException, NumberFormatException, RepositoryException, AccumuloException, AccumuloSecurityException, RyaDAOException, InferenceEngineException, IOException, SailException { + // Initialize the test mongo that will be used to host rya. + final MongodForTestsFactory mongodTestFactory = new MongodForTestsFactory(); + final MongoClient newClient = mongodTestFactory.newMongo(); + clients.add(newClient); + final String host = newClient.getAddress().getHost(); + final int port = newClient.getAddress().getPort(); + final RyaSailRepository newRepo = setupRya(ryaInstanceName, host, port, newClient); + ryaRepos.add(newRepo); + return newClient; + } + + @AfterClass + public static void shutdownMiniResources() throws RepositoryException { + for(final RyaSailRepository repo : ryaRepos) { + repo.shutDown(); + } + for(final RepositoryConnection conn : ryaConns) { + conn.close(); + } + for(final MongoClient client : clients) { + client.close(); + } + ryaRepos.clear(); + ryaConns.clear(); + clients.clear(); + } + + /** + * A helper fuction for creating a {@link BindingSet} from an array of + * {@link Binding}s. + * + * @param bindings + * - The bindings to include in the set. (not null) + * @return A {@link BindingSet} holding the bindings. + */ + protected static BindingSet makeBindingSet(final Binding... bindings) { + final MapBindingSet bindingSet = new MapBindingSet(); + for (final Binding binding : bindings) { + bindingSet.addBinding(binding); + } + return bindingSet; + } + + /** + * A helper function for creating a {@link RyaStatement} that represents a + * Triple. + * + * @param subject + * - The Subject of the Triple. (not null) + * @param predicate + * - The Predicate of the Triple. (not null) + * @param object + * - The Object of the Triple. (not null) + * @return A Triple as a {@link RyaStatement}. + */ + protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final String object) { + checkNotNull(subject); + checkNotNull(predicate); + checkNotNull(object); + + final RyaStatementBuilder builder = RyaStatement.builder().setSubject(new RyaURI(subject)) + .setPredicate(new RyaURI(predicate)); + + if (object.startsWith("http://")) { + builder.setObject(new RyaURI(object)); + } else { + builder.setObject(new RyaType(object)); + } + builder.setTimestamp(new Date().getTime()); + + return builder.build(); + } + + /** + * A helper function for creating a {@link RyaStatement} that represents a + * Triple. + * + * @param subject + * - The Subject of the Triple. (not null) + * @param predicate + * - The Predicate of the Triple. (not null) + * @param object + * - The Object of the Triple. (not null) + * @return A Triple as a {@link RyaStatement}. + */ + protected static RyaStatement makeRyaStatement(final String subject, final String predicate, final int object) { + checkNotNull(subject); + checkNotNull(predicate); + + return RyaStatement.builder().setSubject(new RyaURI(subject)).setPredicate(new RyaURI(predicate)) + .setObject(new RyaType(XMLSchema.INT, "" + object)).build(); + } + + /** + * A helper function for creating a Sesame {@link Statement} that represents + * a Triple.. + * + * @param subject + * - The Subject of the Triple. (not null) + * @param predicate + * - The Predicate of the Triple. (not null) + * @param object + * - The Object of the Triple. (not null) + * @return A Triple as a {@link Statement}. + */ + protected static Statement makeStatement(final String subject, final String predicate, final String object) { + checkNotNull(subject); + checkNotNull(predicate); + checkNotNull(object); + + final RyaStatement ryaStmt = makeRyaStatement(subject, predicate, object); + return RyaToRdfConversions.convertStatement(ryaStmt); + } + + /** + * Sets up a Rya instance + * + * @param user + * @param password + * @param instanceName + * @param zookeepers + * @param appName + * @return + * @throws AccumuloException + * @throws AccumuloSecurityException + * @throws RepositoryException + * @throws RyaDAOException + * @throws NumberFormatException + * @throws UnknownHostException + * @throws InferenceEngineException + * @throws SailException + */ + protected static RyaSailRepository setupRya(final String ryaInstanceName, + final String hostname, final int port, final MongoClient client) + throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException, + NumberFormatException, UnknownHostException, InferenceEngineException, SailException { + checkNotNull(ryaInstanceName); + + // Setup Rya configuration values. + final MongoDBRdfConfiguration conf = getConf(ryaInstanceName, hostname, port); + configs.put(client, conf); + + final Sail sail = RyaSailFactory.getInstance(conf); + final RyaSailRepository ryaRepo = new RyaSailRepository(sail); + return ryaRepo; + } + + protected static MongoDBRdfConfiguration getConf(final String ryaInstanceName, + final String hostname, final int port) { + final MongoDBRdfConfiguration conf = new MongoDBRdfConfiguration(); + conf.setBoolean(ConfigUtils.USE_MONGO, true); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false); + + conf.setTablePrefix(RYA_TABLE_PREFIX); + + conf.setDisplayQueryPlan(true); + + conf.set(ConfigUtils.CLOUDBASE_USER, USER); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, PASSWORD); + + conf.set(MongoDBRdfConfiguration.MONGO_DB_NAME, "test"); + conf.set(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya_"); + + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya_"); + + conf.setMongoPort(""+port); + conf.setMongoInstance(hostname); + conf.setMongoDBName(ryaInstanceName); + return conf; + } + + protected static MongoDBRdfConfiguration getConf(final MongoClient client) { + return configs.get(client); + } + + /** + * Override this method to provide an output configuration to the Fluo + * application. + * <p> + * Returns an empty map by default. + * + * @return The parameters that will be passed to {@link QueryResultObserver} + * at startup. + */ + protected Map<String, String> makeExportParams() { + return new HashMap<>(); + } +} \ No newline at end of file
