Enhance Map_Keys and give the ability to properly page the keys of a map.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fe197af2 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fe197af2 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fe197af2 Branch: refs/heads/asf-site Commit: fe197af239445349ca405ec7a63621c49d34fa65 Parents: ceb50ff Author: Michael Russo <[email protected]> Authored: Thu Aug 18 17:50:30 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Thu Aug 18 17:50:30 2016 -0700 ---------------------------------------------------------------------- .../core/migration/util/AstayanxUtils.java | 49 ------- .../usergrid/persistence/map/MapKeyResults.java | 42 ++++++ .../usergrid/persistence/map/MapManager.java | 7 + .../persistence/map/impl/MapManagerImpl.java | 8 ++ .../persistence/map/impl/MapSerialization.java | 11 ++ .../map/impl/MapSerializationImpl.java | 84 ++++++++--- .../persistence/map/MapManagerTest.java | 139 ++++++++++++++++++- 7 files changed, 268 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java deleted file mode 100644 index 7ae4748..0000000 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.core.migration.util; - - -import com.netflix.astyanax.connectionpool.exceptions.BadRequestException; - - -public class AstayanxUtils { - - /** - * Return true if the exception is an instance of a missing keysapce - * @param rethrowMessage The message to add to the exception if rethrown - * @param cassandraException The exception from cassandar - * @return - */ - public static void isKeyspaceMissing(final String rethrowMessage, final Exception cassandraException ) { - - if ( cassandraException instanceof BadRequestException ) { - - //check if it's b/c the keyspace is missing, if so - final String message = cassandraException.getMessage(); - - //no op, just swallow - if(message.contains( "why:Keyspace" ) && message.contains( "does not exist" )){ - return; - }; - } - - throw new RuntimeException( rethrowMessage, cassandraException ); - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapKeyResults.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapKeyResults.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapKeyResults.java new file mode 100644 index 0000000..0d34856 --- /dev/null +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapKeyResults.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. The ASF licenses this file to You + * under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. For additional information regarding + * copyright in this work, please see the NOTICE file in the top level + * directory of this distribution. + */ +package org.apache.usergrid.persistence.map; + +import java.util.List; + + +public class MapKeyResults { + + private String cursor; + private List<String> keys; + + public MapKeyResults(final String cursor, final List<String> keys){ + + this.cursor = cursor; + this.keys = keys; + + } + + public String getCursor() { + return cursor; + } + + public List<String> getKeys() { + return keys; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java index 80e2d17..5fffaca 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java @@ -90,4 +90,11 @@ public interface MapManager { * @param key The key used to delete the entry */ void delete( final String key ); + + /** + * Return a page of keys that exist within the map + * @param cursor + * @param limit + */ + MapKeyResults getKeys(final String cursor, final int limit); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java index cafa4a1..86e4001 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java @@ -19,9 +19,11 @@ package org.apache.usergrid.persistence.map.impl; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.usergrid.persistence.map.MapKeyResults; import org.apache.usergrid.persistence.map.MapManager; import org.apache.usergrid.persistence.map.MapScope; @@ -110,6 +112,12 @@ public class MapManagerImpl implements MapManager { mapSerialization.delete(scope,key); } + @Override + public MapKeyResults getKeys(final String cursor, final int limit){ + return mapSerialization.getAllKeys(scope, cursor, limit); + } + + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java index e9c21d2..a60f5ac 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java @@ -21,10 +21,12 @@ package org.apache.usergrid.persistence.map.impl; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.usergrid.persistence.core.migration.schema.Migration; +import org.apache.usergrid.persistence.map.MapKeyResults; import org.apache.usergrid.persistence.map.MapScope; @@ -87,4 +89,13 @@ public interface MapSerialization extends Migration { * @param key The key used to delete the entry */ void delete( final MapScope scope, final String key ); + + /** + * Get a list of keys for the given map scope. + * @param cursor Optional pagingState + * @param limit number of keys to return + * @return List of keys + */ + MapKeyResults getAllKeys(final MapScope mapScope, final String cursor, final int limit); + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java index 735f2b8..282974c 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java @@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.core.datastax.CQLUtils; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator; import org.apache.usergrid.persistence.core.shard.StringHashUtils; +import org.apache.usergrid.persistence.map.MapKeyResults; import org.apache.usergrid.persistence.map.MapScope; import com.google.common.base.Preconditions; @@ -41,6 +42,8 @@ import com.google.common.hash.Funnel; import com.google.inject.Inject; import com.google.inject.Singleton; +import static org.apache.commons.lang.StringUtils.isBlank; + @Singleton public class MapSerializationImpl implements MapSerialization { @@ -67,7 +70,7 @@ public class MapSerializationImpl implements MapSerialization { put( "column1", DataType.Name.BLOB ); put( "value", DataType.Name.BLOB ); }}; private static final Map<String, String> MAP_KEYS_CLUSTERING_ORDER = - new HashMap<String, String>(){{ put( "column1", "ASC" ); }}; + new HashMap<String, String>(){{ put( "column1", "DESC" ); }}; @@ -162,12 +165,12 @@ public class MapSerializationImpl implements MapSerialization { .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED)); - final int bucket = BUCKET_LOCATOR.getCurrentBucket( key ); + final int bucket = BUCKET_LOCATOR.getCurrentBucket( scope.getName() ); mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE) .using(timeToLive) - .value("key", getMapKeyPartitionKey(scope, key, bucket)) - .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)) - .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED)); + .value("key", getMapKeyPartitionKey(scope, bucket)) + .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED)) + .value("value", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)); }else{ mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE) @@ -176,12 +179,12 @@ public class MapSerializationImpl implements MapSerialization { .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED)); // get a bucket number for the map keys table - final int bucket = BUCKET_LOCATOR.getCurrentBucket( key ); + final int bucket = BUCKET_LOCATOR.getCurrentBucket( scope.getName() ); mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE) - .value("key", getMapKeyPartitionKey(scope, key, bucket)) - .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)) - .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED)); + .value("key", getMapKeyPartitionKey(scope, bucket)) + .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED)) + .value("value", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)); } @@ -216,10 +219,10 @@ public class MapSerializationImpl implements MapSerialization { session.execute(mapEntry); - final int bucket = BUCKET_LOCATOR.getCurrentBucket( key ); + final int bucket = BUCKET_LOCATOR.getCurrentBucket( scope.getName() ); Statement mapKey; mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE) - .value("key", getMapKeyPartitionKey(scope, key, bucket)) + .value("key", getMapKeyPartitionKey(scope, bucket)) .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED)) .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED)); @@ -253,12 +256,12 @@ public class MapSerializationImpl implements MapSerialization { session.execute(mapEntry); - final int bucket = BUCKET_LOCATOR.getCurrentBucket( key ); + final int bucket = BUCKET_LOCATOR.getCurrentBucket( scope.getName() ); Statement mapKey; mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE) - .value("key", getMapKeyPartitionKey(scope, key, bucket)) + .value("key", getMapKeyPartitionKey(scope, bucket)) .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED)) - .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED)); + .value("value", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)); session.execute(mapKey); } @@ -276,16 +279,17 @@ public class MapSerializationImpl implements MapSerialization { // not sure which bucket the value is in, execute a delete against them all - final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key ); + final int[] buckets = BUCKET_LOCATOR.getAllBuckets( scope.getName() ); List<ByteBuffer> mapKeys = new ArrayList<>(); for( int bucket : buckets){ - mapKeys.add( getMapKeyPartitionKey(scope, key, bucket)); + mapKeys.add( getMapKeyPartitionKey(scope, bucket)); } Statement deleteMapKey; Clause inKey = QueryBuilder.in("key", mapKeys); + Clause column1Equals = QueryBuilder.eq("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED)); deleteMapKey = QueryBuilder.delete().from(MAP_KEYS_TABLE) - .where(inKey); + .where(inKey).and(column1Equals); session.execute(deleteMapKey); @@ -318,6 +322,48 @@ public class MapSerializationImpl implements MapSerialization { } + @Override + public MapKeyResults getAllKeys(final MapScope scope, final String cursor, final int limit ){ + + final int[] buckets = BUCKET_LOCATOR.getAllBuckets( scope.getName() ); + final List<ByteBuffer> partitionKeys = new ArrayList<>(NUM_BUCKETS.length); + + for (int bucket : buckets) { + + partitionKeys.add(getMapKeyPartitionKey(scope, bucket)); + } + + Clause in = QueryBuilder.in("key", partitionKeys); + + Statement statement; + if( isBlank(cursor) ){ + statement = QueryBuilder.select().all().from(MAP_KEYS_TABLE) + .where(in) + .setFetchSize(limit); + }else{ + statement = QueryBuilder.select().all().from(MAP_KEYS_TABLE) + .where(in) + .setFetchSize(limit) + .setPagingState(PagingState.fromString(cursor)); + } + + + ResultSet resultSet = session.execute(statement); + PagingState pagingState = resultSet.getExecutionInfo().getPagingState(); + + final List<String> keys = new ArrayList<>(); + Iterator<Row> resultIterator = resultSet.iterator(); + int size = 0; + while( resultIterator.hasNext() && size < limit){ + + size++; + keys.add((String)DataType.text().deserialize(resultIterator.next().getBytes("column1"), ProtocolVersion.NEWEST_SUPPORTED)); + + } + + return new MapKeyResults(pagingState != null ? pagingState.toString() : null, keys); + + } private ByteBuffer getValueCQL( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) { @@ -456,10 +502,10 @@ public class MapSerializationImpl implements MapSerialization { } - private ByteBuffer getMapKeyPartitionKey(MapScope scope, String key, int bucketNumber){ + private ByteBuffer getMapKeyPartitionKey(MapScope scope, int bucketNumber){ return serializeKeys(scope.getApplication().getUuid(), - scope.getApplication().getType(), scope.getName(), key, bucketNumber); + scope.getApplication().getType(), scope.getName(), "", bucketNumber); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java index 2a68247..254c915 100644 --- a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java +++ b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java @@ -20,10 +20,7 @@ package org.apache.usergrid.persistence.map; -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; import org.junit.Before; @@ -83,6 +80,140 @@ public class MapManagerTest { } @Test + public void getAllKeys(){ + + MapManager mm = mmf.createMapManager(this.scope); + + final String key1 = "key1"; + final String key2 = "key2";; + final String value = "value"; + + mm.putString( key1, value ); + mm.putString( key2, value ); + + MapKeyResults keyResults = mm.getKeys(null, 5); + + assertEquals(2, keyResults.getKeys().size()); + + } + + @Test + public void getAllKeysWithStart(){ + + MapManager mm = mmf.createMapManager(this.scope); + + final String value = "value"; + + final String key1 = "key1"; + final String key2 = "key2"; + final String key3 = "key3"; + final String key4 = "key4"; + final String key5 = "key5"; + final String key6 = "key6"; + + mm.putString( key1, value ); + mm.putString( key2, value ); + mm.putString( key3, value ); + mm.putString( key4, value ); + mm.putString( key5, value ); + mm.putString( key6, value ); + + MapKeyResults keyResults = mm.getKeys(null, 3); + + assertEquals(3, keyResults.getKeys().size()); + assertEquals(key1, keyResults.getKeys().get(0)); + + assertNotNull(keyResults.getCursor()); + + MapKeyResults keyResults2 = mm.getKeys(keyResults.getCursor(), 3); + + assertEquals(3, keyResults2.getKeys().size()); + assertEquals(key4, keyResults2.getKeys().get(0)); + + + } + + @Test + public void getAllKeysAfterDelete(){ + + MapManager mm = mmf.createMapManager(this.scope); + + final String value = "value"; + + final String key1 = "key1"; + final String key2 = "key2"; + final String key3 = "key3"; + final String key4 = "key4"; + final String key5 = "key5"; + final String key6 = "key6"; + + mm.putString( key1, value ); + mm.putString( key2, value ); + mm.putString( key3, value ); + mm.putString( key4, value ); + mm.putString( key5, value ); + mm.putString( key6, value ); + + MapKeyResults keyResults = mm.getKeys(null, 6); + + assertEquals(6, keyResults.getKeys().size()); + assertEquals(key1, keyResults.getKeys().get(0)); + + + mm.delete(key1); + mm.delete(key2); + mm.delete(key3); + + MapKeyResults keyResults2 = mm.getKeys(null, 6); + + assertEquals(3, keyResults2.getKeys().size()); + assertEquals(key4, keyResults2.getKeys().get(0)); + + + } + + @Test + public void getAllKeysAfterMassDelete(){ + + MapManager mm = mmf.createMapManager(this.scope); + + final String value = "value"; + final int count = 11000; + + for ( int i=0; i < 11000; i++){ + mm.putString("key"+i, value); + } + + boolean done = false; + String cursor = null; + int total = 0; + + while(!done && total < count){ + + MapKeyResults keyResults = mm.getKeys(cursor, 1000); + + if( keyResults.getCursor() == null){ + done=true; + } + + cursor = keyResults.getCursor(); + total += keyResults.getKeys().size(); + + } + + assertEquals(count, total); + + for ( int i=0; i < count - 500; i++){ + mm.delete("key"+i); + } + + MapKeyResults keyResults2 = mm.getKeys(null, 1000); + assertEquals(500, keyResults2.getKeys().size()); + + + } + + @Test public void writeReadStringWithLongKey() { MapManager mm = mmf.createMapManager( this.scope );
