Repository: incubator-fluo-recipes
Updated Branches:
  refs/heads/master 42176d1ec -> 573aeb788


Avoid allocating collection in AccumuloExporter


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/commit/573aeb78
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/tree/573aeb78
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/diff/573aeb78

Branch: refs/heads/master
Commit: 573aeb788a0b4623f10b7afd8b0363b501e31776
Parents: 42176d1
Author: Keith Turner <ktur...@apache.org>
Authored: Wed Oct 12 15:26:47 2016 -0400
Committer: Keith Turner <ke...@deenlo.com>
Committed: Thu Oct 13 16:59:32 2016 -0400

----------------------------------------------------------------------
 docs/accumulo-export-queue.md                   |  4 +--
 .../accumulo/export/AccumuloExporter.java       | 24 +++++++++++-----
 .../accumulo/export/AccumuloReplicator.java     | 12 ++++----
 .../accumulo/export/AccumuloExportTest.java     | 29 +++++++++++++-------
 .../recipes/test/export/SimpleExporter.java     |  7 ++---
 5 files changed, 47 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/573aeb78/docs/accumulo-export-queue.md
----------------------------------------------------------------------
diff --git a/docs/accumulo-export-queue.md b/docs/accumulo-export-queue.md
index 33a1c97..b880d58 100644
--- a/docs/accumulo-export-queue.md
+++ b/docs/accumulo-export-queue.md
@@ -24,10 +24,10 @@ Exporting to Accumulo is easy. Follow the steps below:
     public class SimpleExporter extends AccumuloExporter<String, String> {
 
       @Override
-      protected Collection<Mutation> translate(SequencedExport<String, String> 
export) {
+      protected void translate(SequencedExport<String, String> export, 
Consumer<Mutation> consumer) {
         Mutation m = new Mutation(export.getKey());
         m.put("cf", "cq", export.getSequence(), export.getValue());
-        return Collections.singleton(m);
+        consumer.accept(m);
       }
     }
     ```

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/573aeb78/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
----------------------------------------------------------------------
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
index 4cb0730..6a19362 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
@@ -16,10 +16,10 @@
 package org.apache.fluo.recipes.accumulo.export;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.function.Consumer;
 
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.fluo.api.config.SimpleConfiguration;
@@ -68,9 +68,11 @@ public abstract class AccumuloExporter<K, V> extends 
Exporter<K, V> {
 
     ArrayList<Mutation> buffer = new ArrayList<>();
 
+    Consumer<Mutation> consumer = m -> buffer.add(m);
+
     while (exports.hasNext()) {
       SequencedExport<K, V> export = exports.next();
-      buffer.addAll(translate(export));
+      translate(export, consumer);
     }
 
     if (buffer.size() > 0) {
@@ -78,7 +80,14 @@ public abstract class AccumuloExporter<K, V> extends 
Exporter<K, V> {
     }
   }
 
-  protected abstract Collection<Mutation> translate(SequencedExport<K, V> 
export);
+  /**
+   * Implementations of this method should translate the given SequencedExport 
to 0 or more
+   * Mutations.
+   * 
+   * @param export the input that should be translated to mutations
+   * @param consumer output mutations to this consumer
+   */
+  protected abstract void translate(SequencedExport<K, V> export, 
Consumer<Mutation> consumer);
 
   /**
    * Generates Accumulo mutations by comparing the differences between a 
RowColumn/Bytes map that is
@@ -95,12 +104,13 @@ public abstract class AccumuloExporter<K, V> extends 
Exporter<K, V> {
    * <li>The export sequence number is used for the timestamp in the mutation.
    * </ul>
    *
+   * @param consumer generated mutations will be output to this consumer
    * @param oldData Map containing old row/column data
    * @param newData Map containing new row/column data
    * @param seq Export sequence number
    */
-  public static Collection<Mutation> generateMutations(long seq, 
Map<RowColumn, Bytes> oldData,
-      Map<RowColumn, Bytes> newData) {
+  public static void generateMutations(long seq, Map<RowColumn, Bytes> oldData,
+      Map<RowColumn, Bytes> newData, Consumer<Mutation> consumer) {
     Map<Bytes, Mutation> mutationMap = new HashMap<>();
     for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) {
       RowColumn rc = entry.getKey();
@@ -120,7 +130,7 @@ public abstract class AccumuloExporter<K, V> extends 
Exporter<K, V> {
         m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, 
newVal.toArray());
       }
     }
-    return mutationMap.values();
-  }
 
+    mutationMap.values().forEach(consumer);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/573aeb78/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
----------------------------------------------------------------------
diff --git 
a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
 
b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
index abaecfd..ed16fc7 100644
--- 
a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
+++ 
b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
@@ -15,9 +15,9 @@
 
 package org.apache.fluo.recipes.accumulo.export;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 
 import org.apache.accumulo.core.data.Mutation;
@@ -34,8 +34,8 @@ import org.apache.fluo.recipes.core.transaction.TxLog;
 public class AccumuloReplicator extends AccumuloExporter<String, TxLog> {
 
   @Override
-  protected Collection<Mutation> translate(SequencedExport<String, TxLog> 
export) {
-    return generateMutations(export.getSequence(), export.getValue());
+  protected void translate(SequencedExport<String, TxLog> export, 
Consumer<Mutation> consumer) {
+    generateMutations(export.getSequence(), export.getValue(), consumer);
   }
 
   /**
@@ -51,9 +51,9 @@ public class AccumuloReplicator extends 
AccumuloExporter<String, TxLog> {
    *
    * @param txLog Transaction log
    * @param seq Export sequence number
-   * @return Collection of mutations
+   * @param consumer generated mutations will be output to this consumer
    */
-  public static Collection<Mutation> generateMutations(long seq, TxLog txLog) {
+  public static void generateMutations(long seq, TxLog txLog, 
Consumer<Mutation> consumer) {
     Map<Bytes, Mutation> mutationMap = new HashMap<>();
     for (LogEntry le : txLog.getLogEntries()) {
       LogEntry.Operation op = le.getOp();
@@ -78,6 +78,6 @@ public class AccumuloReplicator extends 
AccumuloExporter<String, TxLog> {
         }
       }
     }
-    return mutationMap.values();
+    mutationMap.values().forEach(consumer);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/573aeb78/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
----------------------------------------------------------------------
diff --git 
a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
 
b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
index d0ebaee..53d2013 100644
--- 
a/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
+++ 
b/modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
@@ -15,11 +15,13 @@
 
 package org.apache.fluo.recipes.accumulo.export;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Consumer;
 
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.fluo.api.data.Bytes;
@@ -43,9 +45,9 @@ public class AccumuloExportTest {
     return rcMap;
   }
 
-  public static Collection<Mutation> genMutations(String key, long seq, 
Optional<String> oldVal,
-      Optional<String> newVal) {
-    return AccumuloExporter.generateMutations(seq, genData(key, oldVal), 
genData(key, newVal));
+  public static void genMutations(String key, long seq, Optional<String> 
oldVal,
+      Optional<String> newVal, Consumer<Mutation> consumer) {
+    AccumuloExporter.generateMutations(seq, genData(key, oldVal), genData(key, 
newVal), consumer);
   }
 
   public static Mutation makePut(String key, String val, long seq) {
@@ -70,33 +72,40 @@ public class AccumuloExportTest {
 
   @Test
   public void testDifferenceExport() {
-    Collection<Mutation> mutations;
+    final Collection<Mutation> mutations = new ArrayList<>();
+    Consumer<Mutation> consumer = m -> mutations.add(m);
 
-    mutations = genMutations("k1", 1, Optional.empty(), Optional.of("a"));
+    genMutations("k1", 1, Optional.empty(), Optional.of("a"), consumer);
     Assert.assertEquals(1, mutations.size());
     Assert.assertTrue(mutations.contains(makePut("k1", "a", 1)));
+    mutations.clear();
 
-    mutations = genMutations("k2", 2, Optional.of("ab"), Optional.of("ab"));
+    genMutations("k2", 2, Optional.of("ab"), Optional.of("ab"), consumer);
     Assert.assertEquals(0, mutations.size());
+    mutations.clear();
 
-    mutations = genMutations("k2", 2, Optional.of("b"), Optional.of("ab"));
+    genMutations("k2", 2, Optional.of("b"), Optional.of("ab"), consumer);
     Assert.assertEquals(1, mutations.size());
     Assert.assertTrue(mutations.contains(makePut("k2", "a", 2)));
+    mutations.clear();
 
-    mutations = genMutations("k3", 3, Optional.of("c"), Optional.of("d"));
+    genMutations("k3", 3, Optional.of("c"), Optional.of("d"), consumer);
     Assert.assertEquals(1, mutations.size());
     Mutation m = makeDel("k3", "c", 3);
     addPut(m, "k3", "d", 3);
     Assert.assertTrue(mutations.contains(m));
+    mutations.clear();
 
-    mutations = genMutations("k4", 4, Optional.of("e"), Optional.empty());
+    genMutations("k4", 4, Optional.of("e"), Optional.empty(), consumer);
     Assert.assertEquals(1, mutations.size());
     Assert.assertTrue(mutations.contains(makeDel("k4", "e", 4)));
+    mutations.clear();
 
-    mutations = genMutations("k5", 5, Optional.of("ef"), Optional.of("fg"));
+    genMutations("k5", 5, Optional.of("ef"), Optional.of("fg"), consumer);
     Assert.assertEquals(1, mutations.size());
     m = makeDel("k5", "e", 5);
     addPut(m, "k5", "g", 5);
     Assert.assertTrue(mutations.contains(m));
+    mutations.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/573aeb78/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
----------------------------------------------------------------------
diff --git 
a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
 
b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
index 14c1aa4..d48857e 100644
--- 
a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
+++ 
b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java
@@ -15,8 +15,7 @@
 
 package org.apache.fluo.recipes.test.export;
 
-import java.util.Collection;
-import java.util.Collections;
+import java.util.function.Consumer;
 
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
@@ -25,9 +24,9 @@ import org.apache.fluo.recipes.core.export.SequencedExport;
 public class SimpleExporter extends AccumuloExporter<String, String> {
 
   @Override
-  protected Collection<Mutation> translate(SequencedExport<String, String> 
export) {
+  protected void translate(SequencedExport<String, String> export, 
Consumer<Mutation> consumer) {
     Mutation m = new Mutation(export.getKey());
     m.put("cf", "cq", export.getSequence(), export.getValue());
-    return Collections.singleton(m);
+    consumer.accept(m);
   }
 }

Reply via email to