http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
index 5507037..b796a6f 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
@@ -23,7 +23,8 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.log4j.Logger;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
 import com.google.common.base.Optional;
@@ -44,13 +45,13 @@ import com.google.common.base.Optional;
  * 
  * @see ProducerConfig
  */
-public class KafkaBindingSetExporterFactory implements 
IncrementalBindingSetExporterFactory {
+public class KafkaBindingSetExporterFactory implements 
IncrementalResultExporterFactory {
     private static final Logger log = 
Logger.getLogger(KafkaBindingSetExporterFactory.class);
     @Override
-    public Optional<IncrementalBindingSetExporter> build(Context context) 
throws IncrementalExporterFactoryException, ConfigurationException {
-        final KafkaExportParameters exportParams = new 
KafkaExportParameters(context.getObserverConfiguration().toMap());
-        log.debug("KafkaResultExporterFactory.build(): 
params.isExportToKafka()=" + exportParams.isExportToKafka());
-        if (exportParams.isExportToKafka()) {
+    public Optional<IncrementalResultExporter> build(Context context) throws 
IncrementalExporterFactoryException, ConfigurationException {
+        final KafkaBindingSetExporterParameters exportParams = new 
KafkaBindingSetExporterParameters(context.getObserverConfiguration().toMap());
+        log.debug("KafkaResultExporterFactory.build(): 
params.isExportToKafka()=" + exportParams.getUseKafkaBindingSetExporter());
+        if (exportParams.getUseKafkaBindingSetExporter()) {
             // Setup Kafka connection
             KafkaProducer<String, VisibilityBindingSet> producer = new 
KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig());
             // Create the exporter

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
new file mode 100644
index 0000000..4550a50
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.base.Preconditions;
+
+
+public class KafkaBindingSetExporterParameters extends 
KafkaExportParameterBase {
+    
+    public static final String CONF_USE_KAFKA_BINDING_SET_EXPORTER = 
"pcj.fluo.export.kafka.bindingset.enabled";
+    public static final String CONF_KAFKA_BINDING_SET_SERIALIZER = 
"pcj.fluo.export.kafka.bindingset.serializer";
+
+    public KafkaBindingSetExporterParameters(final Map<String, String> params) 
{
+        super(params);
+    }
+    
+    /**
+     * Instructs the Fluo application to use the Kafka Binding Set Exporter
+     * and sets the appropriate Key/Value Serializer parameters for writing 
BindingSets to Kafka.
+     * @param useExporter
+     *            - {@code True} if the Fluo application should use the
+     *            {@link KafkaBindingSetExporter}; otherwise {@code false}.
+     */
+    public void setUseKafkaBindingSetExporter(final boolean useExporter) {
+        setBoolean(params, CONF_USE_KAFKA_BINDING_SET_EXPORTER, useExporter);
+    }
+
+    /**
+     * @return {@code True} if the Fluo application should use the {@link 
KafkaBindingSetExporter}; otherwise
+     *         {@code false}. Defaults to {@code false} if no value is present.
+     */
+    public boolean getUseKafkaBindingSetExporter() {
+        return getBoolean(params, CONF_USE_KAFKA_BINDING_SET_EXPORTER, false);
+    }
+    
+    /**
+     * 
+     * @param serializer - Used for Serializing BindingSets pushed to Kafka
+     */
+    public void setKafkaBindingSetSerializer(String serializer) {
+        params.put(CONF_KAFKA_BINDING_SET_SERIALIZER, 
Preconditions.checkNotNull(serializer));
+    }
+    
+    /**
+     * @return - Serializer used for Serializing BindingSets to Kafka
+     */
+    public String getKafkaBindingSetSerializer() {
+        return params.getOrDefault(CONF_KAFKA_BINDING_SET_SERIALIZER, 
KryoVisibilityBindingSetSerializer.class.getName());
+    }
+    
+    @Override
+    public Properties listAllConfig() {
+        Properties props = super.listAllConfig();
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
getKafkaBindingSetSerializer());
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
new file mode 100644
index 0000000..aab3929
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.fluo.api.observer.Observer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
+
+import jline.internal.Preconditions;
+
+/**
+ * Provides read/write functions to the parameters map that is passed into an
+ * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
+ * to PCJ exporting to a kafka topic.
+ * Remember: if doesn't count unless it is added to params
+ */
+
+public class KafkaExportParameterBase extends ParametersBase {
+
+    public KafkaExportParameterBase(final Map<String, String> params) {
+        super(params);
+    }
+
+    /**
+     * Sets the bootstrap servers for reading from and writing to Kafka
+     * @param bootstrapServers - connect string for Kafka brokers
+     */
+    public void setKafkaBootStrapServers(String bootstrapServers) {
+        params.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
Preconditions.checkNotNull(bootstrapServers));
+    }
+    
+    /**
+     * @return Connect string for Kafka servers
+     */
+    public Optional<String> getKafkaBootStrapServers() {
+        return 
Optional.ofNullable(params.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    }
+
+    /**
+     * Add the properties to the params, NOT keeping them separate from the 
other params.
+     * Guaranteed by Properties: Each key and its corresponding value in the 
property list is a string.
+     * 
+     * @param producerConfig
+     */
+    public void addAllProducerConfig(final Properties producerConfig) {
+        for (Object key : producerConfig.keySet().toArray()) {
+            Object value = producerConfig.getProperty(key.toString());
+            this.params.put(key.toString(), value.toString());
+        }
+    }
+
+    /**
+     * Collect all the properties
+     * 
+     * @return all the params (not just kafka producer Configuration) as a 
{@link Properties}
+     */
+    public Properties listAllConfig() {
+        Properties props = new Properties();
+        for (Object key : params.keySet().toArray()) {
+            Object value = params.get(key.toString());
+            props.put(key.toString(), value.toString());
+        }
+        return props;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java
deleted file mode 100644
index 347a2e2..0000000
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
-
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.fluo.api.observer.Observer;
-import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
-
-/**
- * Provides read/write functions to the parameters map that is passed into an
- * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
- * to PCJ exporting to a kafka topic.
- * Remember: if doesn't count unless it is added to params
- */
-
-public class KafkaExportParameters extends ParametersBase {
-
-    public static final String CONF_EXPORT_TO_KAFKA = 
"pcj.fluo.export.kafka.enabled";
-
-    public KafkaExportParameters(final Map<String, String> params) {
-        super(params);
-    }
-
-    /**
-     * @param isExportToKafka
-     *            - {@code True} if the Fluo application should export
-     *            to Kafka; otherwise {@code false}.
-     */
-    public void setExportToKafka(final boolean isExportToKafka) {
-        setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka);
-    }
-
-    /**
-     * @return {@code True} if the Fluo application should export to Kafka; 
otherwise
-     *         {@code false}. Defaults to {@code false} if no value is present.
-     */
-    public boolean isExportToKafka() {
-        return getBoolean(params, CONF_EXPORT_TO_KAFKA, false);
-    }
-
-    /**
-     * Add the properties to the params, NOT keeping them separate from the 
other params.
-     * Guaranteed by Properties: Each key and its corresponding value in the 
property list is a string.
-     * 
-     * @param producerConfig
-     */
-    public void addAllProducerConfig(final Properties producerConfig) {
-        for (Object key : producerConfig.keySet().toArray()) {
-            Object value = producerConfig.getProperty(key.toString());
-            this.params.put(key.toString(), value.toString());
-        }
-    }
-
-    /**
-     * Collect all the properties
-     * 
-     * @return all the params (not just kafka producer Configuration) as a 
{@link Properties}
-     */
-    public Properties listAllConfig() {
-        Properties props = new Properties();
-        for (Object key : params.keySet().toArray()) {
-            Object value = params.get(key.toString());
-            props.put(key.toString(), value.toString());
-        }
-        return props;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
index fa27b46..da26329 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
@@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
  */
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -26,8 +27,13 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.log4j.Logger;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
 import org.apache.rya.api.domain.RyaSubGraph;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+
+import com.google.common.collect.Sets;
+
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
 
 /**
@@ -76,4 +82,14 @@ public class KafkaRyaSubGraphExporter implements 
IncrementalRyaSubGraphExporter
         producer.close(5, TimeUnit.SECONDS);
     }
 
+    @Override
+    public Set<QueryType> getQueryTypes() {
+        return Sets.newHashSet(QueryType.CONSTRUCT);
+    }
+
+    @Override
+    public ExportStrategy getExportStrategy() {
+        return ExportStrategy.KAFKA;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
index 2c1e4c0..60e9294 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
@@ -21,10 +21,9 @@ import org.apache.fluo.api.observer.Observer.Context;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.log4j.Logger;
 import org.apache.rya.api.domain.RyaSubGraph;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory;
 
 import com.google.common.base.Optional;
 
@@ -33,9 +32,11 @@ import com.google.common.base.Optional;
  * exporting {@link RyaSubGraph}s from the Rya Fluo application to Kafka.
  *
  */
-public class KafkaRyaSubGraphExporterFactory implements 
IncrementalRyaSubGraphExporterFactory {
+public class KafkaRyaSubGraphExporterFactory implements 
IncrementalResultExporterFactory {
 
     private static final Logger log = 
Logger.getLogger(KafkaRyaSubGraphExporterFactory.class);
+    public static final String CONF_USE_KAFKA_SUBGRAPH_EXPORTER = 
"pcj.fluo.export.kafka.subgraph.enabled";
+    public static final String CONF_KAFKA_SUBGRAPH_SERIALIZER = 
"pcj.fluo.export.kafka.subgraph.serializer";
     
     /**
      * Builds a {@link KafkaRyaSubGraphExporter}.
@@ -45,10 +46,10 @@ public class KafkaRyaSubGraphExporterFactory implements 
IncrementalRyaSubGraphEx
      * @throws ConfigurationException
      */
     @Override
-    public Optional<IncrementalRyaSubGraphExporter> build(Context context) 
throws IncrementalExporterFactoryException, ConfigurationException {
-        final KafkaExportParameters exportParams = new 
KafkaExportParameters(context.getObserverConfiguration().toMap());
-        log.debug("KafkaRyaSubGraphExporterFactory.build(): 
params.isExportToKafka()=" + exportParams.isExportToKafka());
-        if (exportParams.isExportToKafka()) {
+    public Optional<IncrementalResultExporter> build(Context context) throws 
IncrementalExporterFactoryException, ConfigurationException {
+        final KafkaSubGraphExporterParameters exportParams = new 
KafkaSubGraphExporterParameters(context.getObserverConfiguration().toMap());
+        log.debug("KafkaRyaSubGraphExporterFactory.build(): 
params.isExportToKafka()=" + exportParams.getUseKafkaSubgraphExporter());
+        if (exportParams.getUseKafkaSubgraphExporter()) {
             // Setup Kafka connection
             KafkaProducer<String, RyaSubGraph> producer = new 
KafkaProducer<String, RyaSubGraph>(exportParams.listAllConfig());
             // Create the exporter

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java
new file mode 100644
index 0000000..1472fdd
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.base.Preconditions;
+
+
+public class KafkaSubGraphExporterParameters extends KafkaExportParameterBase {
+
+    public static final String CONF_USE_KAFKA_SUBGRAPH_EXPORTER = 
"pcj.fluo.export.kafka.subgraph.enabled";
+    public static final String CONF_KAFKA_SUBGRAPH_SERIALIZER = 
"pcj.fluo.export.kafka.subgraph.serializer";
+    
+    public KafkaSubGraphExporterParameters(final Map<String, String> params) {
+        super(params);
+    }
+    
+    /**
+     * Instructs the Fluo application to use the Kafka BindingSet Exporter
+     * and sets the appropriate Key/Value Serializer parameters for writing 
RyaSubGraphs to Kafka.
+     * @param useExporter
+     *            - {@code True} if the Fluo application should use the
+     *            {@link KafkaRyaSubGraphExporter}; otherwise {@code false}.
+     */
+    public void setUseKafkaSubgraphExporter(final boolean useExporter) {
+        setBoolean(params, CONF_USE_KAFKA_SUBGRAPH_EXPORTER, useExporter);
+    }
+
+    /**
+     * @return {@code True} if the Fluo application should use the {@link 
KafkaRyaSubGraphExporter}; otherwise
+     *         {@code false}. Defaults to {@code false} if no value is present.
+     */
+    public boolean getUseKafkaSubgraphExporter() {
+        return getBoolean(params, CONF_USE_KAFKA_SUBGRAPH_EXPORTER, false);
+    }
+    
+    /**
+     * 
+     * @param serializer - Used for Serializing RyaSubGraphs pushed to Kafka
+     */
+    public void setKafkaSubGraphSerializer(String serializer) {
+        params.put(CONF_KAFKA_SUBGRAPH_SERIALIZER, 
Preconditions.checkNotNull(serializer));
+    }
+    
+    /**
+     * @return - Serializer used for Serializing RyaSubGraphs to Kafka
+     */
+    public String getKafkaSubGraphSerializer() {
+        return params.getOrDefault(CONF_KAFKA_SUBGRAPH_SERIALIZER, 
RyaSubGraphKafkaSerDe.class.getName());
+    }
+    
+    @Override
+    public Properties listAllConfig() {
+        Properties props = super.listAllConfig();
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
getKafkaSubGraphSerializer());
+        return props;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
new file mode 100644
index 0000000..604462b
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import com.google.common.collect.Sets;
+
+public class PeriodicBindingSetExporter implements 
IncrementalBindingSetExporter {
+
+    private PeriodicQueryResultStorage periodicStorage;
+    
+    /**
+     * Constructs an instance of {@link PeriodicBindingSetExporter}.
+     *
+     * @param pcjStorage - The PCJ storage the new results will be exported 
to. (not null)
+     */
+    public PeriodicBindingSetExporter(PeriodicQueryResultStorage 
periodicStorage) {
+        this.periodicStorage = checkNotNull(periodicStorage);
+    }
+    
+    @Override
+    public Set<QueryType> getQueryTypes() {
+        return Sets.newHashSet(QueryType.PERIODIC);
+    }
+
+    @Override
+    public ExportStrategy getExportStrategy() {
+        return ExportStrategy.RYA;
+    }
+
+    @Override
+    public void close() throws Exception {
+    }
+
+    @Override
+    public void export(String queryId, VisibilityBindingSet result) throws 
ResultExportException {
+        try {
+            periodicStorage.addPeriodicQueryResults(queryId, 
Collections.singleton(result));
+        } catch (PeriodicQueryStorageException e) {
+            throw new ResultExportException("Could not successfully export the 
BindingSet: " + result, e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java
new file mode 100644
index 0000000..0a0b767
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.fluo.api.observer.Observer.Context;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+
+import com.google.common.base.Optional;
+
+public class PeriodicBindingSetExporterFactory implements 
IncrementalResultExporterFactory {
+
+    @Override
+    public Optional<IncrementalResultExporter> build(Context context) throws 
IncrementalExporterFactoryException, ConfigurationException {
+        checkNotNull(context);
+
+        // Wrap the context's parameters for parsing.
+        final RyaExportParameters params = new RyaExportParameters( 
context.getObserverConfiguration().toMap() );
+
+        if(params.getUsePeriodicBindingSetExporter()) {
+            // Setup Zookeeper connection info.
+            final String accumuloInstance = 
params.getAccumuloInstanceName().get();
+            final String zookeeperServers =  
params.getZookeeperServers().get().replaceAll(";", ",");
+            final Instance inst = new ZooKeeperInstance(accumuloInstance, 
zookeeperServers);
+
+            try {
+                // Setup Accumulo connection info.
+                final String exporterUsername = 
params.getExporterUsername().get();
+                final String exporterPassword = 
params.getExporterPassword().get();
+                final Connector accumuloConn = 
inst.getConnector(exporterUsername, new PasswordToken(exporterPassword));
+
+                // Setup Rya PCJ Storage.
+                final String ryaInstanceName = 
params.getRyaInstanceName().get();
+                final PeriodicQueryResultStorage periodicStorage = new 
AccumuloPeriodicQueryResultStorage(accumuloConn, ryaInstanceName);
+                
+                // Make the exporter.
+                final IncrementalBindingSetExporter exporter = new 
PeriodicBindingSetExporter(periodicStorage);
+                return Optional.of(exporter);
+
+            } catch (final AccumuloException | AccumuloSecurityException e) {
+                throw new IncrementalExporterFactoryException("Could not 
initialize the Accumulo connector using the provided configuration.", e);
+            }
+        } else {
+            return Optional.absent();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
index 54c39b7..8a9dbe4 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
@@ -22,55 +22,42 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Objects.requireNonNull;
 
 import java.util.Collections;
+import java.util.Set;
 
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 
+import com.google.common.collect.Sets;
+
 /**
  * Incrementally exports SPARQL query results to Accumulo PCJ tables as they 
are defined by Rya.
  */
 public class RyaBindingSetExporter implements IncrementalBindingSetExporter {
 
     private final PrecomputedJoinStorage pcjStorage;
-    private final PeriodicQueryResultStorage periodicStorage;
 
     /**
      * Constructs an instance of {@link RyaBindingSetExporter}.
      *
      * @param pcjStorage - The PCJ storage the new results will be exported 
to. (not null)
      */
-    public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage, 
PeriodicQueryResultStorage periodicStorage) {
+    public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage) {
         this.pcjStorage = checkNotNull(pcjStorage);
-        this.periodicStorage = checkNotNull(periodicStorage);
     }
 
     @Override
-    public void export(
-            final TransactionBase fluoTx,
-            final String queryId,
-            final VisibilityBindingSet result) throws ResultExportException {
-        requireNonNull(fluoTx);
+    public void export(final String queryId, final VisibilityBindingSet 
result) throws ResultExportException {
         requireNonNull(queryId);
         requireNonNull(result);
 
-        // Look up the ID the PCJ represents within the PCJ Storage.
-        final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
-
         try {
-            if (result.hasBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID)) 
{
-                periodicStorage.addPeriodicQueryResults(pcjId, 
Collections.singleton(result));
-            } else {
-                pcjStorage.addResults(pcjId, Collections.singleton(result));
-            }
-        } catch (final PCJStorageException | PeriodicQueryStorageException e) {
-            throw new ResultExportException("A result could not be exported to 
Rya.", e);
+            pcjStorage.addResults(queryId, Collections.singleton(result));
+        } catch (PCJStorageException e) {
+            throw new ResultExportException("Unable to successfully export the 
result: " + result, e);
         }
     }
 
@@ -78,4 +65,14 @@ public class RyaBindingSetExporter implements 
IncrementalBindingSetExporter {
     public void close() throws Exception {
         pcjStorage.close();
     }
+
+    @Override
+    public Set<QueryType> getQueryTypes() {
+        return Sets.newHashSet(QueryType.PROJECTION);
+    }
+
+    @Override
+    public ExportStrategy getExportStrategy() {
+        return ExportStrategy.RYA;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
index 82ce9c6..a87243e 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
@@ -26,8 +26,10 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.fluo.api.observer.Observer.Context;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
@@ -35,21 +37,19 @@ import 
org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultS
 
 import com.google.common.base.Optional;
 
-import org.apache.fluo.api.observer.Observer.Context;
-
 /**
  * Creates instances of {@link RyaBindingSetExporter}.
  */
-public class RyaBindingSetExporterFactory implements 
IncrementalBindingSetExporterFactory {
+public class RyaBindingSetExporterFactory implements 
IncrementalResultExporterFactory {
 
     @Override
-    public Optional<IncrementalBindingSetExporter> build(final Context 
context) throws IncrementalExporterFactoryException, ConfigurationException {
+    public Optional<IncrementalResultExporter> build(final Context context) 
throws IncrementalExporterFactoryException, ConfigurationException {
         checkNotNull(context);
 
         // Wrap the context's parameters for parsing.
         final RyaExportParameters params = new RyaExportParameters( 
context.getObserverConfiguration().toMap() );
 
-        if(params.isExportToRya()) {
+        if(params.getUseRyaBindingSetExporter()) {
             // Setup Zookeeper connection info.
             final String accumuloInstance = 
params.getAccumuloInstanceName().get();
             final String zookeeperServers =  
params.getZookeeperServers().get().replaceAll(";", ",");
@@ -64,10 +64,9 @@ public class RyaBindingSetExporterFactory implements 
IncrementalBindingSetExport
                 // Setup Rya PCJ Storage.
                 final String ryaInstanceName = 
params.getRyaInstanceName().get();
                 final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, ryaInstanceName);
-                final PeriodicQueryResultStorage periodicStorage = new 
AccumuloPeriodicQueryResultStorage(accumuloConn, ryaInstanceName);
                 
                 // Make the exporter.
-                final IncrementalBindingSetExporter exporter = new 
RyaBindingSetExporter(pcjStorage, periodicStorage);
+                final IncrementalBindingSetExporter exporter = new 
RyaBindingSetExporter(pcjStorage);
                 return Optional.of(exporter);
 
             } catch (final AccumuloException | AccumuloSecurityException e) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
index a1ba5b8..aa5d3cd 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
@@ -38,7 +38,8 @@ import org.apache.fluo.api.observer.Observer;
 @DefaultAnnotation(NonNull.class)
 public class RyaExportParameters extends ParametersBase {
 
-    public static final String CONF_EXPORT_TO_RYA = 
"pcj.fluo.export.rya.enabled";
+    public static final String CONF_USE_RYA_BINDING_SET_EXPORTER = 
"pcj.fluo.export.rya.bindingset.enabled";
+    public static final String CONF_USE_PERIODIC_BINDING_SET_EXPORTER = 
"pcj.fluo.export.periodic.bindingset.enabled";
     public static final String CONF_ACCUMULO_INSTANCE_NAME = 
"pcj.fluo.export.rya.accumuloInstanceName";
     public static final String CONF_ZOOKEEPER_SERVERS = 
"pcj.fluo.export.rya.zookeeperServers";
     public static final String CONF_EXPORTER_USERNAME = 
"pcj.fluo.export.rya.exporterUsername";
@@ -57,19 +58,35 @@ public class RyaExportParameters extends ParametersBase {
     }
 
     /**
-     * @param isExportToRya - {@code True} if the Fluo application should 
export
-     *   to Rya; otherwise {@code false}.
+     * @param useExporter - {@code True} if the Fluo application should use 
the {@link RyaBindingSetExporter}; otherwise
+     *            {@code false}.
      */
-    public void setExportToRya(final boolean isExportToRya) {
-        setBoolean(params, CONF_EXPORT_TO_RYA, isExportToRya);
+    public void setUseRyaBindingSetExporter(final boolean useExporter) {
+        setBoolean(params, CONF_USE_RYA_BINDING_SET_EXPORTER, useExporter);
     }
 
     /**
-     * @return {@code True} if the Fluo application should export to Rya; 
otherwise
+     * @return {@code True} if the Fluo application should use the {@link 
RyaBindingSetExporter}; otherwise
      *   {@code false}. Defaults to {@code false} if no value is present.
      */
-    public boolean isExportToRya() {
-        return getBoolean(params, CONF_EXPORT_TO_RYA, false);
+    public boolean getUseRyaBindingSetExporter() {
+        return getBoolean(params, CONF_USE_RYA_BINDING_SET_EXPORTER, false);
+    }
+    
+    /**
+     * @param useExporter - {@code True} if the Fluo application should use the
+     *            {@link PeriodicBindingSetExporter}; otherwise {@code false}.
+     */
+    public void setUsePeriodicBindingSetExporter(final boolean useExporter) {
+        setBoolean(params, CONF_USE_PERIODIC_BINDING_SET_EXPORTER, 
useExporter);
+    }
+
+    /**
+     * @return {@code True} if the Fluo application should use the {@link 
PeriodicBindingSetExporter}; otherwise
+     *         {@code false}. Defaults to {@code false} if no value is present.
+     */
+    public boolean getUsePeriodicBindingSetExporter() {
+        return getBoolean(params, CONF_USE_PERIODIC_BINDING_SET_EXPORTER, 
false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java
new file mode 100644
index 0000000..6a99a7e
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.fluo.api.config.FluoConfiguration;
+
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * This class manages the parameters used to construct the RyaSubGraphExporter.
+ *
+ */
+public class RyaSubGraphExportParameters extends RyaExportParameters {
+
+    public static final String CONF_FLUO_INSTANCE = 
"pcj.fluo.export.rya.fluo.instance";
+    public static final String CONF_FLUO_INSTANCE_ZOOKEEPERS = 
"pcj.fluo.export.rya.fluo.instance.zookeepers";
+    public static final String CONF_FLUO_TABLE_NAME = 
"pcj.fluo.export.rya.fluo.table.name";
+    public static final String CONF_USE_RYA_SUBGRAPH_EXPORTER = 
"pcj.fluo.export.rya.subgraph.enabled";
+    
+    
+    public RyaSubGraphExportParameters(Map<String, String> params) {
+        super(params);
+    }
+    
+    /**
+     * @param useExporter - indicates whether to use the {@link 
RyaSubGraphExporter}
+     */
+    public void setUseRyaSubGraphExporter(boolean useExporter) {
+        setBoolean(params, CONF_USE_RYA_SUBGRAPH_EXPORTER, useExporter);
+    }
+    
+    /**
+     * @return boolean indicating whether to use the {@link 
RyaSubGraphExporter}
+     */
+    public boolean getUseRyaSubGraphExporter() {
+        return getBoolean(params, CONF_USE_RYA_SUBGRAPH_EXPORTER, false);
+    }
+    
+    /**
+     * @param fluoInstance - the Accumulo instance that Fluo is running on
+     */
+    public void setFluoInstanceName(String fluoInstance) {
+        params.put(CONF_FLUO_INSTANCE, 
Preconditions.checkNotNull(fluoInstance));
+    }
+    
+    /**
+     * @return the Accumulo instance that Fluo is running on
+     */
+    public Optional<String> getFluoInstanceName() {
+        return Optional.ofNullable(params.get(CONF_FLUO_INSTANCE));
+    }
+    
+    /**
+     * @param fluoTable - the name of the Accumulo Fluo table
+     */
+    public void setFluoTable(@Nullable String fluoTable) {
+        params.put(CONF_FLUO_TABLE_NAME, fluoTable);
+    }
+    
+    /**
+     * @return the name of the Accumulo Fluo table
+     */
+    public Optional<String> getFluoTable() {
+        return Optional.ofNullable(params.get(CONF_FLUO_TABLE_NAME));
+    }
+    
+    /**
+     * @param zookeepers - the zookeepers for the Fluo instance
+     */
+    public void setFluoZookeepers(@Nullable String zookeepers) {
+        params.put(CONF_FLUO_INSTANCE_ZOOKEEPERS, zookeepers);
+    }
+    
+    /**
+     * @return - the zookeepers for the Fluo instance
+     */
+    public Optional<String> getFLuoZookeepers() {
+        return Optional.ofNullable(params.get(CONF_FLUO_INSTANCE_ZOOKEEPERS));
+    }
+    
+    /**
+     * Uses underlying parameter map to build a FluoConfiguration object
+     * @return - FluoConfiguration for creating a FluoClient
+     */
+    public FluoConfiguration getFluoConfiguration() {
+        final FluoConfiguration config = new FluoConfiguration();
+        config.setMiniStartAccumulo(false);
+        config.setAccumuloInstance(params.get(CONF_ACCUMULO_INSTANCE_NAME));
+        config.setAccumuloUser(params.get(CONF_EXPORTER_USERNAME));
+        config.setAccumuloPassword(params.get(CONF_EXPORTER_PASSWORD));
+        
config.setInstanceZookeepers(params.get(CONF_FLUO_INSTANCE_ZOOKEEPERS));
+        config.setAccumuloZookeepers(params.get(CONF_ZOOKEEPER_SERVERS));
+
+        config.setApplicationName(params.get(CONF_FLUO_APP_NAME));
+        config.setAccumuloTable(params.get(CONF_FLUO_TABLE_NAME));
+        return config;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
new file mode 100644
index 0000000..e33ea97
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.resolver.triple.TripleRow;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * This exporter is used to import {@link RyaSubGraph}s back into Fluo. By 
ingesting
+ * RyaSubGraphs back into Fluo, queries can be chained together.
+ *
+ */
+public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
+
+    private static final Logger log = 
Logger.getLogger(RyaSubGraphExporter.class);
+    private static final WholeRowTripleResolver TRIPLE_RESOLVER = new 
WholeRowTripleResolver();
+    private final FluoClient fluo;
+    
+    public RyaSubGraphExporter(FluoClient fluo) {
+        this.fluo = Preconditions.checkNotNull(fluo);
+    }
+    
+    @Override
+    public Set<QueryType> getQueryTypes() {
+        return Sets.newHashSet(QueryType.CONSTRUCT);
+    }
+
+    @Override
+    public ExportStrategy getExportStrategy() {
+        return ExportStrategy.RYA;
+    }
+
+    @Override
+    public void close() throws Exception {
+        fluo.close();
+    }
+
+    @Override
+    public void export(String constructID, RyaSubGraph subgraph) throws 
ResultExportException {
+        insertTriples(fluo.newTransaction(), subgraph.getStatements());
+    }
+    
+    private void insertTriples(TransactionBase tx, final 
Collection<RyaStatement> triples) {
+        for (final RyaStatement triple : triples) {
+            Optional<byte[]> visibility = 
Optional.fromNullable(triple.getColumnVisibility());
+            try {
+                tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, 
Bytes.of(visibility.or(new byte[0])));
+            } catch (final TripleRowResolverException e) {
+                log.error("Could not convert a Triple into the SPO format: " + 
triple);
+            }
+        }
+    }
+
+    /**
+     * Converts a triple into a byte[] holding the Rya SPO representation of 
it.
+     *
+     * @param triple - The triple to convert. (not null)
+     * @return The Rya SPO representation of the triple.
+     * @throws TripleRowResolverException The triple could not be converted.
+     */
+    private static byte[] spoFormat(final RyaStatement triple) throws 
TripleRowResolverException {
+        checkNotNull(triple);
+        final Map<TABLE_LAYOUT, TripleRow> serialized = 
TRIPLE_RESOLVER.serialize(triple);
+        final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
+        return spoRow.getRow();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java
new file mode 100644
index 0000000..25f60a5
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/**
+ * Factory class for building {@link RyaSubGraphExporter}s.
+ *
+ */
+public class RyaSubGraphExporterFactory implements 
IncrementalResultExporterFactory {
+
+    @Override
+    public Optional<IncrementalResultExporter> build(Context context) throws 
IncrementalExporterFactoryException, ConfigurationException {
+        Preconditions.checkNotNull(context);
+        
+        RyaSubGraphExportParameters params = new 
RyaSubGraphExportParameters(context.getObserverConfiguration().toMap());
+
+        if (params.getUseRyaSubGraphExporter()) {
+            try {
+                //Get FluoConfiguration from params
+                FluoConfiguration conf = params.getFluoConfiguration();
+                FluoClient fluo = FluoFactory.newClient(conf);
+                
+                //Create exporter
+                RyaSubGraphExporter exporter = new RyaSubGraphExporter(fluo);
+                return Optional.of(exporter);
+            } catch (Exception e) {
+                throw new IncrementalExporterFactoryException("Could not 
initialize the RyaSubGraphExporter", e);
+            }
+        }
+        return Optional.absent();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
index 1cb1594..6147fa8 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
@@ -28,7 +28,6 @@ import 
org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.ObjectSeria
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
 import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.openrdf.query.BindingSet;
 
@@ -45,8 +44,6 @@ public class AggregationObserver extends BindingSetUpdater {
 
     private static final AggregationStateSerDe STATE_SERDE = new 
ObjectSerializationAggregationStateSerDe();
 
-    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
-
     @Override
     public ObservedColumn getObservedColumn() {
         return new ObservedColumn(FluoQueryColumns.AGGREGATION_BINDING_SET, 
NotificationType.STRONG);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index 7d0fd5e..c0cfa1d 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -55,7 +55,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 public abstract class BindingSetUpdater extends AbstractObserver {
     private static final Logger log = 
Logger.getLogger(BindingSetUpdater.class);
     // DAO
-    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+    protected final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
     // Updaters
     private final JoinResultUpdater joinUpdater = new JoinResultUpdater();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
index f0fef07..61e7244 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
@@ -1,4 +1,3 @@
-package org.apache.rya.indexing.pcj.fluo.app.observers;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,54 +16,20 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
  * specific language governing permissions and limitations
  * under the License.
  */
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.io.UnsupportedEncodingException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+package org.apache.rya.indexing.pcj.fluo.app.observers;
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.utils.VisibilitySimplifier;
-import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
 import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaSubGraph;
-import org.apache.rya.api.resolver.triple.TripleRow;
-import org.apache.rya.api.resolver.triple.TripleRowResolverException;
-import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-
 /**
  * Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new
  * Construct Query {@link RyaStatement}s and exports the results using the
@@ -74,49 +39,7 @@ import com.google.common.collect.ImmutableSet;
  */
 public class ConstructQueryResultObserver extends AbstractObserver {
 
-    private static final WholeRowTripleResolver TRIPLE_RESOLVER = new 
WholeRowTripleResolver();
     private static final Logger log = 
Logger.getLogger(ConstructQueryResultObserver.class);
-    private static final RyaSubGraphKafkaSerDe serializer = new 
RyaSubGraphKafkaSerDe();
-
-    /**
-     * We expect to see the same expressions a lot, so we cache the simplified
-     * forms.
-     */
-    private final Map<String, String> simplifiedVisibilities = new HashMap<>();
-
-    /**
-     * Builders for each type of result exporter we support.
-     */
-    private static final ImmutableSet<IncrementalRyaSubGraphExporterFactory> 
factories = ImmutableSet
-            .<IncrementalRyaSubGraphExporterFactory> builder().add(new 
KafkaRyaSubGraphExporterFactory()).build();
-
-    /**
-     * The exporters that are configured.
-     */
-    private ImmutableSet<IncrementalRyaSubGraphExporter> exporters = null;
-
-    /**
-     * Before running, determine which exporters are configured and set them 
up.
-     */
-    @Override
-    public void init(final Context context) {
-        final ImmutableSet.Builder<IncrementalRyaSubGraphExporter> 
exportersBuilder = ImmutableSet.builder();
-
-        for (final IncrementalRyaSubGraphExporterFactory builder : factories) {
-            try {
-                log.debug("ConstructQueryResultObserver.init(): for each 
exportersBuilder=" + builder);
-
-                final Optional<IncrementalRyaSubGraphExporter> exporter = 
builder.build(context);
-                if (exporter.isPresent()) {
-                    exportersBuilder.add(exporter.get());
-                }
-            } catch (final IncrementalExporterFactoryException e) {
-                log.error("Could not initialize a result exporter.", e);
-            }
-        }
-
-        exporters = exportersBuilder.build();
-    }
 
     @Override
     public ObservedColumn getObservedColumn() {
@@ -125,74 +48,20 @@ public class ConstructQueryResultObserver extends 
AbstractObserver {
 
     @Override
     public void process(TransactionBase tx, Bytes row, Column col) throws 
Exception {
+        
+        //Build row for parent that result will be written to
+        BindingSetRow bsRow = BindingSetRow.make(row);
+        String constructNodeId = bsRow.getNodeId();
+        String bsString= bsRow.getBindingSetString();
+        String parentNodeId = tx.get(Bytes.of(constructNodeId), 
FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString();
+        String rowString = parentNodeId + 
IncrementalUpdateConstants.NODEID_BS_DELIM + bsString;
+        
+        //Get NodeType of the parent node
+        NodeType parentType = NodeType.fromNodeId(parentNodeId).get();
+        //Get data for the ConstructQuery result
         Bytes bytes = tx.get(row, col);
-        RyaSubGraph subgraph = serializer.fromBytes(bytes.toArray());
-        Set<RyaStatement> statements = subgraph.getStatements();
-        if (statements.size() > 0) {
-            byte[] visibility = 
statements.iterator().next().getColumnVisibility();
-            visibility = simplifyVisibilities(visibility);
-            for(RyaStatement statement: statements) {
-                statement.setColumnVisibility(visibility);
-            }
-            subgraph.setStatements(statements);
-
-            for (IncrementalRyaSubGraphExporter exporter : exporters) {
-                exporter.export(row.toString(), subgraph);
-            }
-        }
-        //add generated triples back into Fluo for chaining queries together
-        insertTriples(tx, subgraph.getStatements());
-    }
-    
-    @Override
-    public void close() {
-        if(exporters != null) {
-            for(final IncrementalRyaSubGraphExporter exporter : exporters) {
-                try {
-                    exporter.close();
-                } catch(final Exception e) {
-                    log.warn("Problem encountered while closing one of the 
exporters.", e);
-                }
-            }
-        }
-    }
-
-    private byte[] simplifyVisibilities(byte[] visibilityBytes) throws 
UnsupportedEncodingException {
-        // Simplify the result's visibilities and cache new simplified
-        // visibilities
-        String visibility = new String(visibilityBytes, "UTF-8");
-        if (!simplifiedVisibilities.containsKey(visibility)) {
-            String simplified = VisibilitySimplifier.simplify(visibility);
-            simplifiedVisibilities.put(visibility, simplified);
-        }
-        return simplifiedVisibilities.get(visibility).getBytes("UTF-8");
+        //Write result to parent
+        tx.set(Bytes.of(rowString), parentType.getResultColumn(), bytes);
     }
-    
-    private void insertTriples(TransactionBase tx, final 
Collection<RyaStatement> triples) {
-
-        for (final RyaStatement triple : triples) {
-            Optional<byte[]> visibility = 
Optional.fromNullable(triple.getColumnVisibility());
-            try {
-                tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, 
Bytes.of(visibility.or(new byte[0])));
-            } catch (final TripleRowResolverException e) {
-                log.error("Could not convert a Triple into the SPO format: " + 
triple);
-            }
-        }
-    }
-    
-
-    /**
-     * Converts a triple into a byte[] holding the Rya SPO representation of 
it.
-     *
-     * @param triple - The triple to convert. (not null)
-     * @return The Rya SPO representation of the triple.
-     * @throws TripleRowResolverException The triple could not be converted.
-     */
-    public static byte[] spoFormat(final RyaStatement triple) throws 
TripleRowResolverException {
-        checkNotNull(triple);
-        final Map<TABLE_LAYOUT, TripleRow> serialized = 
TRIPLE_RESOLVER.serialize(triple);
-        final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
-        return spoRow.getRow();
-    }
-
+   
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index ee03334..b4edfea 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 import org.openrdf.query.BindingSet;
@@ -39,8 +38,6 @@ public class FilterObserver extends BindingSetUpdater {
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
 
-    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
-
     @Override
     public ObservedColumn getObservedColumn() {
         return new ObservedColumn(FluoQueryColumns.FILTER_BINDING_SET, 
NotificationType.STRONG);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index 28e31d8..c56a98f 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -24,7 +24,6 @@ import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -39,8 +38,6 @@ public class JoinObserver extends BindingSetUpdater {
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
 
-    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
-
     @Override
     public ObservedColumn getObservedColumn() {
         return new ObservedColumn(FluoQueryColumns.JOIN_BINDING_SET, 
NotificationType.STRONG);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
index e7072e7..7d96baa 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
 import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -42,7 +41,6 @@ import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 public class PeriodicQueryObserver extends BindingSetUpdater {
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
-    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
     @Override
     public ObservedColumn getObservedColumn() {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
index b712606..5d73b2e 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -37,7 +36,6 @@ public class ProjectionObserver extends BindingSetUpdater {
     private static final Logger log = 
Logger.getLogger(ProjectionObserver.class);
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
-    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
 
     @Override
     public ObservedColumn getObservedColumn() {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index e6368ba..ba7beee 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -20,24 +20,24 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
 
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.AbstractObserver;
 import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.indexing.pcj.fluo.app.export.ExporterManager;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
@@ -46,28 +46,23 @@ import com.google.common.collect.ImmutableSet;
  * Performs incremental result exporting to the configured destinations.
  */
 public class QueryResultObserver extends AbstractObserver {
+    
     private static final Logger log = 
Logger.getLogger(QueryResultObserver.class);
-
-    private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
-
-    /**
-     * We expect to see the same expressions a lot, so we cache the simplified 
forms.
-     */
-    private final Map<String, String> simplifiedVisibilities = new HashMap<>();
-
+    private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+    
     /**
-     * Builders for each type of result exporter we support.
+     * Builders for each type of {@link IncrementalBindingSetExporter} we 
support.
      */
-    private static final ImmutableSet<IncrementalBindingSetExporterFactory> 
factories =
-            ImmutableSet.<IncrementalBindingSetExporterFactory>builder()
+    private static final ImmutableSet<IncrementalResultExporterFactory> 
factories =
+            ImmutableSet.<IncrementalResultExporterFactory>builder()
                 .add(new RyaBindingSetExporterFactory())
                 .add(new KafkaBindingSetExporterFactory())
+                .add(new KafkaRyaSubGraphExporterFactory())
+                .add(new RyaSubGraphExporterFactory())
+                .add(new PeriodicBindingSetExporterFactory())
                 .build();
-
-    /**
-     * The exporters that are configured.
-     */
-    private ImmutableSet<IncrementalBindingSetExporter> exporters = null;
+    
+    private ExporterManager exporterManager;
 
     @Override
     public ObservedColumn getObservedColumn() {
@@ -79,63 +74,46 @@ public class QueryResultObserver extends AbstractObserver {
      */
     @Override
     public void init(final Context context) {
-        final ImmutableSet.Builder<IncrementalBindingSetExporter> 
exportersBuilder = ImmutableSet.builder();
-
-        for(final IncrementalBindingSetExporterFactory builder : factories) {
+        
+        ExporterManager.Builder managerBuilder = ExporterManager.builder();
+        
+        for(final IncrementalResultExporterFactory builder : factories) {
             try {
                 log.debug("QueryResultObserver.init(): for each 
exportersBuilder=" + builder);
 
-                final Optional<IncrementalBindingSetExporter> exporter = 
builder.build(context);
+                final Optional<IncrementalResultExporter> exporter = 
builder.build(context);
                 if(exporter.isPresent()) {
-                    exportersBuilder.add(exporter.get());
+                    
managerBuilder.addIncrementalResultExporter(exporter.get());
                 }
             } catch (final IncrementalExporterFactoryException e) {
                 log.error("Could not initialize a result exporter.", e);
             }
         }
-
-        exporters = exportersBuilder.build();
+        
+        exporterManager = managerBuilder.build();
     }
+    
 
     @Override
     public void process(final TransactionBase tx, final Bytes brow, final 
Column col) throws Exception {
         final String row = brow.toString();
 
-        // Read the SPARQL query and it Binding Set from the row id.
+        // Read the queryId from the row and get the QueryMetadata.
         final String queryId = row.split(NODEID_BS_DELIM)[0];
+        final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId);
 
         // Read the Child Binding Set that will be exported.
         final Bytes valueBytes = tx.get(brow, col);
-        final VisibilityBindingSet result = BS_SERDE.deserialize(valueBytes);
         
-        // Simplify the result's visibilities.
-        final String visibility = result.getVisibility();
-        if(!simplifiedVisibilities.containsKey(visibility)) {
-            final String simplified = VisibilitySimplifier.simplify( 
visibility );
-            simplifiedVisibilities.put(visibility, simplified);
-        }
-        result.setVisibility( simplifiedVisibilities.get(visibility) );
-
-        // Export the result using each of the provided exporters.
-        for(final IncrementalBindingSetExporter exporter : exporters) {
-            try {
-                exporter.export(tx, queryId, result);
-            } catch (final ResultExportException e) {
-                log.error("Could not export a binding set for query '" + 
queryId + "'. Binding Set: " + result, e);
-            }
-        }
+        exporterManager.export(metadata.getQueryType(), 
metadata.getExportStrategies(), queryId, valueBytes);
     }
 
     @Override
     public void close() {
-        if(exporters != null) {
-            for(final IncrementalBindingSetExporter exporter : exporters) {
-                try {
-                    exporter.close();
-                } catch(final Exception e) {
-                    log.warn("Problem encountered while closing one of the 
exporters.", e);
-                }
-            }
+        try {
+            exporterManager.close();
+        } catch (Exception e) {
+           log.warn("Encountered problems closing the ExporterManager.");
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index 69a651e..607267a 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -24,7 +24,6 @@ import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -39,9 +38,6 @@ public class StatementPatternObserver extends 
BindingSetUpdater {
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
 
-    // DAO
-    private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
-
     @Override
     public ObservedColumn getObservedColumn() {
         return new 
ObservedColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, 
NotificationType.STRONG);

Reply via email to