Author: kturner
Date: Tue May  7 14:38:27 2013
New Revision: 1479924

URL: http://svn.apache.org/r1479924
Log:
ACCUMULO-1337 patch from Pushpinder Heer to add WholeColumnFamilyIterator

Added:
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIterator.java
    
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIteratorTest.java

Added: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIterator.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIterator.java?rev=1479924&view=auto
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIterator.java
 (added)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIterator.java
 Tue May  7 14:38:27 2013
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.OptionDescriber;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.hadoop.io.Text;
+
+/**
+ * 
+ * The WholeColumnFamilyIterator is designed to provide row/cf-isolation so 
that queries see mutations as atomic. It does so by grouping row/Column family 
(as
+ * key) and rest of data as Value into a single key/value pair, which is 
returned through the client as an atomic operation.
+ * 
+ * To regain the original key/value pairs of the row, call the decodeRow 
function on the key/value pair that this iterator returned.
+ * 
+ * @since 1.6.0
+ */
+public class WholeColumnFamilyIterator implements 
SortedKeyValueIterator<Key,Value>, OptionDescriber {
+  
+  private SortedKeyValueIterator<Key,Value> sourceIter;
+  private Key topKey = null;
+  private Value topValue = null;
+  
+  public WholeColumnFamilyIterator() {
+
+  }
+  
+  WholeColumnFamilyIterator(SortedKeyValueIterator<Key,Value> source) {
+    this.sourceIter = source;
+  }
+  
+  /**
+   * Decode whole row/cf out of value. decode key value pairs that have been 
encoded into a single // value
+   * 
+   * @param rowKey
+   *          the row key to decode
+   * @param rowValue
+   *          the value to decode
+   * @return the sorted map. After decoding the flattened data map
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static final SortedMap<Key,Value> decodeColumnFamily(Key rowKey, 
Value rowValue) throws IOException {
+    SortedMap<Key,Value> map = new TreeMap<Key,Value>();
+    ByteArrayInputStream in = new ByteArrayInputStream(rowValue.get());
+    DataInputStream din = new DataInputStream(in);
+    int numKeys = din.readInt();
+    for (int i = 0; i < numKeys; i++) {
+      byte[] cq;
+      byte[] cv;
+      byte[] valBytes;
+      // read the col qual
+      {
+        int len = din.readInt();
+        cq = new byte[len];
+        din.read(cq);
+      }
+      // read the col visibility
+      {
+        int len = din.readInt();
+        cv = new byte[len];
+        din.read(cv);
+      }
+      // read the timestamp
+      long timestamp = din.readLong();
+      // read the value
+      {
+        int len = din.readInt();
+        valBytes = new byte[len];
+        din.read(valBytes);
+      }
+      map.put(new Key(rowKey.getRowData().toArray(), 
rowKey.getColumnFamilyData().toArray(), cq, cv, timestamp, false, false), new 
Value(valBytes, false));
+    }
+    return map;
+  }
+  
+  /**
+   * Encode row/cf. Take a stream of keys and values and output a value that 
encodes everything but their row and column families keys and values must be 
paired
+   * one for one
+   * 
+   * @param keys
+   *          the row keys to encode into value
+   * @param values
+   *          the value to encode
+   * @return the value. After encoding keys/values
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static final Value encodeColumnFamily(List<Key> keys, List<Value> 
values) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    DataOutputStream dout = new DataOutputStream(out);
+    dout.writeInt(keys.size());
+    for (int i = 0; i < keys.size(); i++) {
+      Key k = keys.get(i);
+      Value v = values.get(i);
+      // write the colqual
+      {
+        ByteSequence bs = k.getColumnQualifierData();
+        dout.writeInt(bs.length());
+        dout.write(bs.getBackingArray(), bs.offset(), bs.length());
+      }
+      // write the column visibility
+      {
+        ByteSequence bs = k.getColumnVisibilityData();
+        dout.writeInt(bs.length());
+        dout.write(bs.getBackingArray(), bs.offset(), bs.length());
+      }
+      // write the timestamp
+      dout.writeLong(k.getTimestamp());
+      // write the value
+      byte[] valBytes = v.get();
+      dout.writeInt(valBytes.length);
+      dout.write(valBytes);
+    }
+    
+    return new Value(out.toByteArray());
+  }
+  
+  List<Key> keys = new ArrayList<Key>();
+  List<Value> values = new ArrayList<Value>();
+  
+  private void prepKeys() throws IOException {
+    if (topKey != null)
+      return;
+    Text currentRow;
+    Text currentCf;
+    
+    do {
+      if (sourceIter.hasTop() == false)
+        return;
+      currentRow = new Text(sourceIter.getTopKey().getRow());
+      currentCf = new Text(sourceIter.getTopKey().getColumnFamily());
+      
+      keys.clear();
+      values.clear();
+      while (sourceIter.hasTop() && 
sourceIter.getTopKey().getRow().equals(currentRow) && 
sourceIter.getTopKey().getColumnFamily().equals(currentCf)) {
+        keys.add(new Key(sourceIter.getTopKey()));
+        values.add(new Value(sourceIter.getTopValue()));
+        sourceIter.next();
+      }
+    } while (!filter(currentRow, keys, values));
+    
+    topKey = new Key(currentRow, currentCf);
+    topValue = encodeColumnFamily(keys, values);
+    
+  }
+  
+  /**
+   * 
+   * @param currentRow
+   *          All keys & cf have this in their row portion (do not modify!).
+   * @param keys
+   *          One key for each key & cf group in the row, ordered as they are 
given by the source iterator (do not modify!).
+   * @param values
+   *          One value for each key in keys, ordered to correspond to the 
ordering in keys (do not modify!).
+   * @return true if we want to keep the row, false if we want to skip it
+   */
+  protected boolean filter(Text currentRow, List<Key> keys, List<Value> 
values) {
+    return true;
+  }
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    if (sourceIter != null)
+      return new WholeColumnFamilyIterator(sourceIter.deepCopy(env));
+    return new WholeColumnFamilyIterator();
+  }
+  
+  @Override
+  public Key getTopKey() {
+    return topKey;
+  }
+  
+  @Override
+  public Value getTopValue() {
+    return topValue;
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return topKey != null || sourceIter.hasTop();
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
+    sourceIter = source;
+  }
+  
+  @Override
+  public void next() throws IOException {
+    topKey = null;
+    topValue = null;
+    prepKeys();
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive) throws IOException {
+    topKey = null;
+    topValue = null;
+    
+    Key sk = range.getStartKey();
+    
+    if (sk != null && sk.getColumnQualifierData().length() == 0 && 
sk.getColumnVisibilityData().length() == 0 && sk.getTimestamp() == 
Long.MAX_VALUE
+        && !range.isStartKeyInclusive()) {
+      // assuming that we are seeking using a key previously returned by
+      // this iterator
+      // therefore go to the next row/cf
+      Key followingRowKey = sk.followingKey(PartialKey.ROW_COLFAM);
+      if (range.getEndKey() != null && 
followingRowKey.compareTo(range.getEndKey()) > 0)
+        return;
+      
+      range = new Range(sk.followingKey(PartialKey.ROW_COLFAM), true, 
range.getEndKey(), range.isEndKeyInclusive());
+    }
+    
+    sourceIter.seek(range, columnFamilies, inclusive);
+    prepKeys();
+  }
+  
+  @Override
+  public IteratorOptions describeOptions() {
+    return new IteratorOptions("wholecolumnfamilyiterator", 
"WholeColumnFamilyIterator. Group equal row & column family into single row 
entry.", null, null);
+  }
+  
+  @Override
+  public boolean validateOptions(Map<String,String> options) {
+    return true;
+  }
+  
+}

Added: 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIteratorTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIteratorTest.java?rev=1479924&view=auto
==============================================================================
--- 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIteratorTest.java
 (added)
+++ 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/WholeColumnFamilyIteratorTest.java
 Tue May  7 14:38:27 2013
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+
+public class WholeColumnFamilyIteratorTest extends TestCase {
+  
+  public void testEmptyStuff() throws IOException {
+    SortedMap<Key,Value> map = new TreeMap<Key,Value>();
+    SortedMap<Key,Value> map2 = new TreeMap<Key,Value>();
+    final Map<Text,Boolean> toInclude = new HashMap<Text,Boolean>();
+    map.put(new Key(new Text("r1"), new Text("cf1"), new Text("cq1"), new 
Text("cv1"), 1l), new Value("val1".getBytes()));
+    map.put(new Key(new Text("r1"), new Text("cf1"), new Text("cq2"), new 
Text("cv1"), 2l), new Value("val2".getBytes()));
+    map.put(new Key(new Text("r2"), new Text("cf1"), new Text("cq1"), new 
Text("cv1"), 3l), new Value("val3".getBytes()));
+    map.put(new Key(new Text("r2"), new Text("cf2"), new Text("cq1"), new 
Text("cv1"), 4l), new Value("val4".getBytes()));
+    map.put(new Key(new Text("r3"), new Text("cf1"), new Text("cq1"), new 
Text("cv1"), 5l), new Value("val4".getBytes()));
+    map.put(new Key(new Text("r3"), new Text("cf1"), new Text("cq1"), new 
Text("cv2"), 6l), new Value("val4".getBytes()));
+    map.put(new Key(new Text("r4"), new Text("cf1"), new Text("cq1"), new 
Text("cv1"), 7l), new Value("".getBytes()));
+    map.put(new Key(new Text("r4"), new Text("cf1"), new Text("cq1"), new 
Text(""), 8l), new Value("val1".getBytes()));
+    map.put(new Key(new Text("r4"), new Text("cf1"), new Text(""), new 
Text("cv1"), 9l), new Value("val1".getBytes()));
+    map.put(new Key(new Text("r4"), new Text(""), new Text("cq1"), new 
Text("cv1"), 10l), new Value("val1".getBytes()));
+    map.put(new Key(new Text(""), new Text("cf1"), new Text("cq1"), new 
Text("cv1"), 11l), new Value("val1".getBytes()));
+    boolean b = true;
+    int trueCount = 0;
+    for (Key k : map.keySet()) {
+      if (toInclude.containsKey(k.getRow())) {
+        if (toInclude.get(k.getRow())) {
+          map2.put(k, map.get(k));
+        }
+        continue;
+      }
+      b = !b;
+      toInclude.put(k.getRow(), b);
+      if (b) {
+        trueCount++;
+        map2.put(k, map.get(k));
+      }
+    }
+    SortedMapIterator source = new SortedMapIterator(map);
+    WholeColumnFamilyIterator iter = new WholeColumnFamilyIterator(source);
+    SortedMap<Key,Value> resultMap = new TreeMap<Key,Value>();
+    iter.seek(new Range(), new ArrayList<ByteSequence>(), false);
+    int numRows = 0;
+    while (iter.hasTop()) {
+      numRows++;
+      Key rowKey = iter.getTopKey();
+      Value rowValue = iter.getTopValue();
+      resultMap.putAll(WholeColumnFamilyIterator.decodeColumnFamily(rowKey, 
rowValue));
+      iter.next();
+    }
+    
+    // we have 7 groups of row key/cf
+    Assert.assertEquals(7, numRows);
+    
+    assertEquals(resultMap, map);
+    
+    WholeColumnFamilyIterator iter2 = new WholeColumnFamilyIterator(source) {
+      @Override
+      public boolean filter(Text row, List<Key> keys, List<Value> values) {
+        return toInclude.get(row);
+      }
+    };
+    resultMap.clear();
+    iter2.seek(new Range(), new ArrayList<ByteSequence>(), false);
+    numRows = 0;
+    while (iter2.hasTop()) {
+      numRows++;
+      Key rowKey = iter2.getTopKey();
+      Value rowValue = iter2.getTopValue();
+      resultMap.putAll(WholeColumnFamilyIterator.decodeColumnFamily(rowKey, 
rowValue));
+      iter2.next();
+    }
+    assertTrue(numRows == trueCount);
+    assertEquals(resultMap, map2);
+  }
+  
+  private void pkv(SortedMap<Key,Value> map, String row, String cf, String cq, 
String cv, long ts, String val) {
+    map.put(new Key(new Text(row), new Text(cf), new Text(cq), new Text(cv), 
ts), new Value(val.getBytes()));
+  }
+  
+  public void testContinue() throws Exception {
+    SortedMap<Key,Value> map1 = new TreeMap<Key,Value>();
+    pkv(map1, "row1", "cf1", "cq1", "cv1", 5, "foo");
+    pkv(map1, "row1", "cf1", "cq2", "cv1", 6, "bar");
+    
+    SortedMap<Key,Value> map2 = new TreeMap<Key,Value>();
+    pkv(map2, "row2", "cf1", "cq1", "cv1", 5, "foo");
+    pkv(map2, "row2", "cf1", "cq2", "cv1", 6, "bar");
+    
+    SortedMap<Key,Value> map3 = new TreeMap<Key,Value>();
+    pkv(map3, "row3", "cf1", "cq1", "cv1", 5, "foo");
+    pkv(map3, "row3", "cf1", "cq2", "cv1", 6, "bar");
+    
+    SortedMap<Key,Value> map = new TreeMap<Key,Value>();
+    map.putAll(map1);
+    map.putAll(map2);
+    map.putAll(map3);
+    
+    SortedMapIterator source = new SortedMapIterator(map);
+    WholeColumnFamilyIterator iter = new WholeColumnFamilyIterator(source);
+    
+    Range range = new Range(new Text("row1"), true, new Text("row2"), true);
+    iter.seek(range, new ArrayList<ByteSequence>(), false);
+    
+    assertTrue(iter.hasTop());
+    assertEquals(map1, 
WholeColumnFamilyIterator.decodeColumnFamily(iter.getTopKey(), 
iter.getTopValue()));
+    
+    // simulate something continuing using the last key from the iterator
+    // this is what client and server code will do
+    range = new Range(iter.getTopKey(), false, range.getEndKey(), 
range.isEndKeyInclusive());
+    iter.seek(range, new ArrayList<ByteSequence>(), false);
+    
+    assertTrue(iter.hasTop());
+    assertEquals(map2, 
WholeColumnFamilyIterator.decodeColumnFamily(iter.getTopKey(), 
iter.getTopValue()));
+    
+    iter.next();
+    
+    assertFalse(iter.hasTop());
+    
+  }
+  
+  public void testBug1() throws Exception {
+    SortedMap<Key,Value> map1 = new TreeMap<Key,Value>();
+    pkv(map1, "row1", "cf1", "cq1", "cv1", 5, "foo");
+    pkv(map1, "row1", "cf1", "cq2", "cv1", 6, "bar");
+    
+    SortedMap<Key,Value> map2 = new TreeMap<Key,Value>();
+    pkv(map2, "row2", "cf1", "cq1", "cv1", 5, "foo");
+    
+    SortedMap<Key,Value> map = new TreeMap<Key,Value>();
+    map.putAll(map1);
+    map.putAll(map2);
+    
+    MultiIterator source = new 
MultiIterator(Collections.singletonList((SortedKeyValueIterator<Key,Value>) new 
SortedMapIterator(map)), new Range(null, true,
+        new Text("row1"), true));
+    WholeColumnFamilyIterator iter = new WholeColumnFamilyIterator(source);
+    
+    Range range = new Range(new Text("row1"), true, new Text("row2"), true);
+    iter.seek(range, new ArrayList<ByteSequence>(), false);
+    
+    assertTrue(iter.hasTop());
+    assertEquals(map1, 
WholeColumnFamilyIterator.decodeColumnFamily(iter.getTopKey(), 
iter.getTopValue()));
+    
+    // simulate something continuing using the last key from the iterator
+    // this is what client and server code will do
+    range = new Range(iter.getTopKey(), false, range.getEndKey(), 
range.isEndKeyInclusive());
+    iter.seek(range, new ArrayList<ByteSequence>(), false);
+    
+    assertFalse(iter.hasTop());
+    
+  }
+  
+}


Reply via email to