http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java new file mode 100644 index 0000000..7660e8f --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.hash; + +import org.apache.geode.cache.Region; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisConstants.ArityDef; +import org.apache.geode.redis.internal.RedisDataType; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class HValsExecutor extends HashExecutor { + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 2) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.HVALS)); + return; + } + + ByteArrayWrapper key = command.getKey(); + checkDataType(key, RedisDataType.REDIS_HASH, context); + + Region<ByteArrayWrapper, ByteArrayWrapper> keyRegion = getRegion(context, key); + + if (keyRegion == null) { + command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator())); + return; + } + + Collection<ByteArrayWrapper> vals = new ArrayList(keyRegion.values()); + + if (vals.isEmpty()) { + command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator())); + return; + } + + command.setResponse(Coder.getBulkStringArrayResponse(context.getByteBufAllocator(), vals)); + } + +}
http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HashExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HashExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HashExecutor.java new file mode 100644 index 0000000..1ef7a07 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HashExecutor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.hash; + +import org.apache.geode.cache.Region; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.executor.AbstractExecutor; + +public abstract class HashExecutor extends AbstractExecutor { + + protected final int FIELD_INDEX = 2; + + @SuppressWarnings("unchecked") + protected Region<ByteArrayWrapper, ByteArrayWrapper> getOrCreateRegion( + ExecutionHandlerContext context, ByteArrayWrapper key, RedisDataType type) { + return (Region<ByteArrayWrapper, ByteArrayWrapper>) context.getRegionProvider() + .getOrCreateRegion(key, type, context); + } + + @SuppressWarnings("unchecked") + protected Region<ByteArrayWrapper, ByteArrayWrapper> getRegion(ExecutionHandlerContext context, + ByteArrayWrapper key) { + return (Region<ByteArrayWrapper, ByteArrayWrapper>) context.getRegionProvider().getRegion(key); + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/HllExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/HllExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/HllExecutor.java new file mode 100644 index 0000000..e440c09 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/HllExecutor.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.hll; + +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.RedisDataTypeMismatchException; +import org.apache.geode.redis.internal.executor.AbstractExecutor; + +public abstract class HllExecutor extends AbstractExecutor { + + public static final Double DEFAULT_HLL_STD_DEV = 0.081; + public static final Integer DEFAULT_HLL_DENSE = 18; + public static final Integer DEFAULT_HLL_SPARSE = 32; + + protected final void checkAndSetDataType(ByteArrayWrapper key, ExecutionHandlerContext context) { + Object oldVal = context.getRegionProvider().metaPutIfAbsent(key, RedisDataType.REDIS_HLL); + if (oldVal == RedisDataType.REDIS_PROTECTED) + throw new RedisDataTypeMismatchException("The key name \"" + key + "\" is protected"); + if (oldVal != null && oldVal != RedisDataType.REDIS_HLL) + throw new RedisDataTypeMismatchException( + "The key name \"" + key + "\" is already used by a " + oldVal.toString()); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFAddExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFAddExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFAddExecutor.java new file mode 100644 index 0000000..d85a4ca --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFAddExecutor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.hll; + +import org.apache.geode.cache.Region; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisConstants.ArityDef; +import org.apache.geode.redis.internal.hll.HyperLogLogPlus; + +import java.util.List; + +public class PFAddExecutor extends HllExecutor { + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 2) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.PFADD)); + return; + } + + ByteArrayWrapper key = command.getKey(); + checkAndSetDataType(key, context); + Region<ByteArrayWrapper, HyperLogLogPlus> keyRegion = + context.getRegionProvider().gethLLRegion(); + + HyperLogLogPlus hll = keyRegion.get(key); + + boolean changed = false; + + if (hll == null) + hll = new HyperLogLogPlus(DEFAULT_HLL_DENSE); + + for (int i = 2; i < commandElems.size(); i++) { + byte[] bytes = commandElems.get(i); + boolean offerChange = hll.offer(bytes); + if (offerChange) + changed = true; + } + + keyRegion.put(key, hll); + + if (changed) + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 1)); + else + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 0)); + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFCountExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFCountExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFCountExecutor.java new file mode 100644 index 0000000..1639542 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFCountExecutor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.hll; + +import org.apache.geode.cache.Region; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisConstants.ArityDef; +import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.hll.CardinalityMergeException; +import org.apache.geode.redis.internal.hll.HyperLogLogPlus; + +import java.util.ArrayList; +import java.util.List; + +public class PFCountExecutor extends HllExecutor { + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 2) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.PFCOUNT)); + return; + } + + Region<ByteArrayWrapper, HyperLogLogPlus> keyRegion = + context.getRegionProvider().gethLLRegion(); + + List<HyperLogLogPlus> hlls = new ArrayList<HyperLogLogPlus>(); + + for (int i = 1; i < commandElems.size(); i++) { + ByteArrayWrapper k = new ByteArrayWrapper(commandElems.get(i)); + checkDataType(k, RedisDataType.REDIS_HLL, context); + HyperLogLogPlus h = keyRegion.get(k); + if (h != null) + hlls.add(h); + } + if (hlls.isEmpty()) { + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 0)); + return; + } + + HyperLogLogPlus tmp = hlls.remove(0); + HyperLogLogPlus[] estimators = hlls.toArray(new HyperLogLogPlus[hlls.size()]); + try { + tmp = (HyperLogLogPlus) tmp.merge(estimators); + } catch (CardinalityMergeException e) { + throw new RuntimeException(e); + } + long cardinality = tmp.cardinality(); + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), cardinality)); + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFMergeExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFMergeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFMergeExecutor.java new file mode 100644 index 0000000..ee3eacf --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/PFMergeExecutor.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.hll; + +import org.apache.geode.cache.Region; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisConstants.ArityDef; +import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.hll.CardinalityMergeException; +import org.apache.geode.redis.internal.hll.HyperLogLogPlus; + +import java.util.ArrayList; +import java.util.List; + +public class PFMergeExecutor extends HllExecutor { + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 3) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.PFMERGE)); + return; + } + + ByteArrayWrapper destKey = command.getKey(); + checkAndSetDataType(destKey, context); + Region<ByteArrayWrapper, HyperLogLogPlus> keyRegion = + context.getRegionProvider().gethLLRegion(); + HyperLogLogPlus mergedHLL = keyRegion.get(destKey); + if (mergedHLL == null) + mergedHLL = new HyperLogLogPlus(DEFAULT_HLL_DENSE); + List<HyperLogLogPlus> hlls = new ArrayList<HyperLogLogPlus>(); + + for (int i = 2; i < commandElems.size(); i++) { + ByteArrayWrapper k = new ByteArrayWrapper(commandElems.get(i)); + checkDataType(k, RedisDataType.REDIS_HLL, context); + HyperLogLogPlus h = keyRegion.get(k); + if (h != null) + hlls.add(h); + } + if (hlls.isEmpty()) { + context.getRegionProvider().removeKey(destKey); + command.setResponse(Coder.getSimpleStringResponse(context.getByteBufAllocator(), "OK")); + return; + } + + HyperLogLogPlus[] estimators = hlls.toArray(new HyperLogLogPlus[hlls.size()]); + try { + mergedHLL = (HyperLogLogPlus) mergedHLL.merge(estimators); + } catch (CardinalityMergeException e) { + throw new RuntimeException(e); + } + keyRegion.put(destKey, mergedHLL); + command.setResponse(Coder.getSimpleStringResponse(context.getByteBufAllocator(), "OK")); + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/Varint.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/Varint.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/Varint.java new file mode 100644 index 0000000..da370ce --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hll/Varint.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.hll; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + + +/** + * <p> + * Encodes signed and unsigned values using a common variable-length scheme, found for example in + * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google's Protocol + * Buffers</a>. It uses fewer bytes to encode smaller values, but will use slightly more bytes to + * encode large values. + * </p> + * <p/> + * <p> + * Signed values are further encoded using so-called zig-zag encoding in order to make them + * "compatible" with variable-length encoding. + * </p> + */ +public final class Varint { + + private Varint() {} + + /** + * Encodes a value using the variable-length encoding from + * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol + * Buffers</a>. It uses zig-zag encoding to efficiently encode signed values. If values are known + * to be nonnegative, {@link #writeUnsignedVarLong(long, DataOutput)} should be used. + * + * @param value value to encode + * @param out to write bytes to + * @throws IOException if {@link DataOutput} throws {@link IOException} + */ + public static void writeSignedVarLong(long value, DataOutput out) throws IOException { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + writeUnsignedVarLong((value << 1) ^ (value >> 63), out); + } + + /** + * Encodes a value using the variable-length encoding from + * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> Google Protocol + * Buffers</a>. Zig-zag is not used, so input must not be negative. If values can be negative, use + * {@link #writeSignedVarLong(long, DataOutput)} instead. This method treats negative input as + * like a large unsigned value. + * + * @param value value to encode + * @param out to write bytes to + * @throws IOException if {@link DataOutput} throws {@link IOException} + */ + public static void writeUnsignedVarLong(long value, DataOutput out) throws IOException { + while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) { + out.writeByte(((int) value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte((int) value & 0x7F); + } + + /** + * @see #writeSignedVarLong(long, DataOutput) + */ + public static void writeSignedVarInt(int value, DataOutput out) throws IOException { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + writeUnsignedVarInt((value << 1) ^ (value >> 31), out); + } + + /** + * @see #writeUnsignedVarLong(long, DataOutput) + */ + public static void writeUnsignedVarInt(int value, DataOutput out) throws IOException { + while ((value & 0xFFFFFF80) != 0L) { + out.writeByte((value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte(value & 0x7F); + } + + public static byte[] writeSignedVarInt(int value) { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + return writeUnsignedVarInt((value << 1) ^ (value >> 31)); + } + + /** + * @see #writeUnsignedVarLong(long, DataOutput) + * <p/> + * This one does not use streams and is much faster. Makes a single object each time, and + * that object is a primitive array. + */ + public static byte[] writeUnsignedVarInt(int value) { + byte[] byteArrayList = new byte[10]; + int i = 0; + while ((value & 0xFFFFFF80) != 0L) { + byteArrayList[i++] = ((byte) ((value & 0x7F) | 0x80)); + value >>>= 7; + } + byteArrayList[i] = ((byte) (value & 0x7F)); + byte[] out = new byte[i + 1]; + for (; i >= 0; i--) { + out[i] = byteArrayList[i]; + } + return out; + } + + /** + * @param in to read bytes from + * @return decode value + * @throws IOException if {@link DataInput} throws {@link IOException} + * @throws IllegalArgumentException if variable-length value does not terminate after 9 bytes have + * been read + * @see #writeSignedVarLong(long, DataOutput) + */ + public static long readSignedVarLong(DataInput in) throws IOException { + long raw = readUnsignedVarLong(in); + // This undoes the trick in writeSignedVarLong() + long temp = (((raw << 63) >> 63) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1L << 63)); + } + + /** + * @param in to read bytes from + * @return decode value + * @throws IOException if {@link DataInput} throws {@link IOException} + * @throws IllegalArgumentException if variable-length value does not terminate after 9 bytes have + * been read + * @see #writeUnsignedVarLong(long, DataOutput) + */ + public static long readUnsignedVarLong(DataInput in) throws IOException { + long value = 0L; + int i = 0; + long b; + while (((b = in.readByte()) & 0x80L) != 0) { + value |= (b & 0x7F) << i; + i += 7; + if (i > 63) { + throw new IllegalArgumentException("Variable length quantity is too long"); + } + } + return value | (b << i); + } + + /** + * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have + * been read + * @throws IOException if {@link DataInput} throws {@link IOException} + * @see #readSignedVarLong(DataInput) + */ + public static int readSignedVarInt(DataInput in) throws IOException { + int raw = readUnsignedVarInt(in); + // This undoes the trick in writeSignedVarInt() + int temp = (((raw << 31) >> 31) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values. + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1 << 31)); + } + + /** + * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have + * been read + * @throws IOException if {@link DataInput} throws {@link IOException} + * @see #readUnsignedVarLong(DataInput) + */ + public static int readUnsignedVarInt(DataInput in) throws IOException { + int value = 0; + int i = 0; + int b; + while (((b = in.readByte()) & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + if (i > 35) { + throw new IllegalArgumentException("Variable length quantity is too long"); + } + } + return value | (b << i); + } + + public static int readSignedVarInt(byte[] bytes) { + int raw = readUnsignedVarInt(bytes); + // This undoes the trick in writeSignedVarInt() + int temp = (((raw << 31) >> 31) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values. + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1 << 31)); + } + + public static int readUnsignedVarInt(byte[] bytes) { + int value = 0; + int i = 0; + byte rb = Byte.MIN_VALUE; + for (byte b : bytes) { + rb = b; + if ((b & 0x80) == 0) { + break; + } + value |= (b & 0x7f) << i; + i += 7; + if (i > 35) { + throw new IllegalArgumentException("Variable length quantity is too long"); + } + } + return value | (rb << i); + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LIndexExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LIndexExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LIndexExecutor.java new file mode 100644 index 0000000..e6044e2 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LIndexExecutor.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.Struct; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisConstants.ArityDef; +import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.executor.ListQuery; + +import java.util.List; + +public class LIndexExecutor extends ListExecutor { + + private final String ERROR_NOT_NUMERIC = "The index provided is not numeric"; + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 3) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.LINDEX)); + return; + } + + ByteArrayWrapper key = command.getKey(); + byte[] indexArray = commandElems.get(2); + + checkDataType(key, RedisDataType.REDIS_LIST, context); + Region<Integer, ByteArrayWrapper> keyRegion = getRegion(context, key); + + if (keyRegion == null) { + command.setResponse(Coder.getNilResponse(context.getByteBufAllocator())); + return; + } + + int listSize = keyRegion.size() - LIST_EMPTY_SIZE; + + Integer redisIndex; + + try { + redisIndex = Coder.bytesToInt(indexArray); + } catch (NumberFormatException e) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_NOT_NUMERIC)); + return; + } + + /* + * Now the fun part, converting the redis index into our index. The redis index is 0 based but + * negative values count from the tail + */ + + if (redisIndex < 0) + // Since the redisIndex is negative here, this will reset it to be a standard 0 based index + redisIndex = listSize + redisIndex; + + /* + * If the index is still less than 0 that means the index has shot off back past the beginning, + * which means the index isn't real and a nil is returned + */ + if (redisIndex < 0) { + command.setResponse(Coder.getNilResponse(context.getByteBufAllocator())); + return; + } + + /* + * Now we must get that element from the region + */ + Struct entry; + try { + entry = getEntryAtIndex(context, key, redisIndex); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (entry == null) { + command.setResponse(Coder.getNilResponse(context.getByteBufAllocator())); + return; + } + + Object[] entryArray = entry.getFieldValues(); + ByteArrayWrapper valueWrapper = (ByteArrayWrapper) entryArray[1]; + command.setResponse( + Coder.getBulkStringResponse(context.getByteBufAllocator(), valueWrapper.toBytes())); + } + + private Struct getEntryAtIndex(ExecutionHandlerContext context, ByteArrayWrapper key, int index) + throws Exception { + + Query query = getQuery(key, ListQuery.LINDEX, context); + + Object[] params = {Integer.valueOf(index + 1)}; + + SelectResults<?> results = (SelectResults<?>) query.execute(params); + + if (results == null || results.size() == 0 || results.size() <= index) + return null; + else + return (Struct) results.asList().get(index); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LInsertExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LInsertExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LInsertExecutor.java new file mode 100644 index 0000000..d1649c2 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LInsertExecutor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; + +public class LInsertExecutor extends ListExecutor { + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), + "Unfortunately GemFireRedis server does not support LINSERT")); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LLenExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LLenExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LLenExecutor.java new file mode 100644 index 0000000..6abed92 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LLenExecutor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.cache.Region; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisConstants.ArityDef; +import org.apache.geode.redis.internal.RedisDataType; + +import java.util.List; + +public class LLenExecutor extends ListExecutor { + + private final int NOT_EXISTS = 0; + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 2) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.LLEN)); + return; + } + + ByteArrayWrapper key = command.getKey(); + + int listSize = 0; + + checkDataType(key, RedisDataType.REDIS_LIST, context); + Region<Integer, ByteArrayWrapper> keyRegion = getRegion(context, key); + + if (keyRegion == null) { + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NOT_EXISTS)); + return; + } + + listSize = keyRegion.size() - LIST_EMPTY_SIZE; + + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), listSize)); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPopExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPopExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPopExecutor.java new file mode 100644 index 0000000..40736c8 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPopExecutor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.redis.internal.RedisConstants.ArityDef; + + +public class LPopExecutor extends PopExecutor { + + @Override + protected ListDirection popType() { + return ListDirection.LEFT; + } + + @Override + public String getArgsError() { + return ArityDef.LPOP; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPushExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPushExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPushExecutor.java new file mode 100644 index 0000000..6624a61 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPushExecutor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.redis.internal.RedisConstants.ArityDef; + + +public class LPushExecutor extends PushExecutor { + + @Override + protected ListDirection pushType() { + return ListDirection.LEFT; + } + + @Override + public String getArgsError() { + return ArityDef.LPUSH; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPushXExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPushXExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPushXExecutor.java new file mode 100644 index 0000000..dc84896 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LPushXExecutor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.redis.internal.RedisConstants.ArityDef; + + +public class LPushXExecutor extends PushXExecutor { + + @Override + protected ListDirection pushType() { + return ListDirection.LEFT; + } + + @Override + public String getArgsError() { + return ArityDef.LPUSHX; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LRangeExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LRangeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LRangeExecutor.java new file mode 100644 index 0000000..38186ca --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LRangeExecutor.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.Struct; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisConstants.ArityDef; +import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.executor.ListQuery; + +import java.util.List; + +public class LRangeExecutor extends ListExecutor { + + private final String ERROR_NOT_NUMERIC = "The index provided is not numeric"; + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 4) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.LRANGE)); + return; + } + + ByteArrayWrapper key = command.getKey(); + byte[] startArray = commandElems.get(2); + byte[] stopArray = commandElems.get(3); + + int redisStart; + int redisStop; + + + checkDataType(key, RedisDataType.REDIS_LIST, context); + Region<Integer, ByteArrayWrapper> keyRegion = getRegion(context, key); + + if (keyRegion == null) { + command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator())); + return; + } + + int listSize = keyRegion.size() - LIST_EMPTY_SIZE; + if (listSize == 0) { + command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator())); + return; + } + + try { + redisStart = Coder.bytesToInt(startArray); + redisStop = Coder.bytesToInt(stopArray); + } catch (NumberFormatException e) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_NOT_NUMERIC)); + return; + } + + + redisStart = getBoundedStartIndex(redisStart, listSize); + redisStop = getBoundedEndIndex(redisStop, listSize); + if (redisStart > redisStop) { + command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator())); + return; + } + redisStart = Math.min(redisStart, listSize - 1); + redisStop = Math.min(redisStop, listSize - 1); + + + List<Struct> range; + try { + range = getRange(context, key, redisStart, redisStop, keyRegion); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (range == null) + command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator())); + else + command.setResponse( + Coder.getBulkStringArrayResponseOfValues(context.getByteBufAllocator(), range)); + } + + private List<Struct> getRange(ExecutionHandlerContext context, ByteArrayWrapper key, int start, + int stop, Region r) throws Exception { + + Query query = getQuery(key, ListQuery.LRANGE, context); + + Object[] params = {Integer.valueOf(stop + 1)}; + SelectResults<Struct> results = (SelectResults<Struct>) query.execute(params); + int size = results.size(); + if (results == null || size <= start) { + return null; + } + + return results.asList().subList(start, size); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LRemExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LRemExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LRemExecutor.java new file mode 100644 index 0000000..f9fbbf3 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LRemExecutor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.Struct; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisConstants.ArityDef; +import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.executor.ListQuery; + +import java.util.List; + +public class LRemExecutor extends ListExecutor { + + private final String ERROR_NOT_NUMERIC = "The count provided is not numeric"; + + private final int NOT_EXISTS = 0; + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 4) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.LREM)); + return; + } + + ByteArrayWrapper key = command.getKey(); + byte[] countArray = commandElems.get(2); + byte[] value = commandElems.get(3); + + int count; + + + checkDataType(key, RedisDataType.REDIS_LIST, context); + Region<Integer, ByteArrayWrapper> keyRegion = getRegion(context, key); + + if (keyRegion == null) { + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NOT_EXISTS)); + return; + } + + try { + count = Coder.bytesToInt(countArray); + } catch (NumberFormatException e) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_NOT_NUMERIC)); + return; + } + + List<Struct> removeList; + try { + removeList = getRemoveList(context, key, new ByteArrayWrapper(value), count); + } catch (Exception e) { + throw new RuntimeException(e); + } + + int numRemoved = 0; + + if (removeList == null) { + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), numRemoved)); + return; + } + + for (Struct entry : removeList) { + Integer removeKey = (Integer) entry.getFieldValues()[0]; + Object oldVal = keyRegion.remove(removeKey); + if (oldVal != null) + numRemoved++; + } + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), numRemoved)); + } + + private List<Struct> getRemoveList(ExecutionHandlerContext context, ByteArrayWrapper key, + ByteArrayWrapper value, int count) throws Exception { + Object[] params; + Query query; + if (count > 0) { + query = getQuery(key, ListQuery.LREMG, context); + params = new Object[] {value, Integer.valueOf(count)}; + } else if (count < 0) { + query = getQuery(key, ListQuery.LREML, context); + params = new Object[] {value, Integer.valueOf(-count)}; + } else { + query = getQuery(key, ListQuery.LREME, context); + params = new Object[] {value}; + } + + + SelectResults<Struct> results = (SelectResults<Struct>) query.execute(params); + + if (results == null || results.isEmpty()) { + return null; + } + + return results.asList(); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LSetExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LSetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LSetExecutor.java new file mode 100644 index 0000000..e2f13cb --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LSetExecutor.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisConstants.ArityDef; +import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.executor.ListQuery; + +import java.util.List; + +public class LSetExecutor extends ListExecutor { + + private final String ERROR_NOT_NUMERIC = "The index provided is not numeric"; + + private final String ERROR_INDEX = + "The index provided is not within range of this list or the key does not exist"; + + private final String SUCCESS = "OK"; + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 4) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.LSET)); + return; + } + + ByteArrayWrapper key = command.getKey(); + byte[] indexArray = commandElems.get(2); + byte[] value = commandElems.get(3); + + int index; + + + checkDataType(key, RedisDataType.REDIS_LIST, context); + Region<Integer, ByteArrayWrapper> keyRegion = getRegion(context, key); + + if (keyRegion == null) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_INDEX)); + return; + } + + try { + index = Coder.bytesToInt(indexArray); + } catch (NumberFormatException e) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_NOT_NUMERIC)); + return; + } + + int listSize = keyRegion.size() - LIST_EMPTY_SIZE; + if (index < 0) + index += listSize; + if (index < 0 || index > listSize) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_INDEX)); + return; + } + + Integer indexKey; + try { + indexKey = getIndexKey(context, key, index); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (indexKey == null) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_INDEX)); + return; + } + if (index == listSize) + indexKey++; + keyRegion.put(indexKey, new ByteArrayWrapper(value)); + command.setResponse(Coder.getSimpleStringResponse(context.getByteBufAllocator(), SUCCESS)); + } + + private Integer getIndexKey(ExecutionHandlerContext context, ByteArrayWrapper key, int index) + throws Exception { + Query query = getQuery(key, ListQuery.LSET, context); + + Object[] params = {Integer.valueOf(index + 1)}; + + SelectResults<Integer> results = (SelectResults<Integer>) query.execute(params); + int size = results.size(); + if (results == null || size == 0) { + return null; + } + + return results.asList().get(size - 1); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LTrimExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LTrimExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LTrimExecutor.java new file mode 100644 index 0000000..1a7b369 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/LTrimExecutor.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisConstants.ArityDef; +import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.executor.ListQuery; + +import java.util.List; + +public class LTrimExecutor extends ListExecutor { + + private final String ERROR_KEY_NOT_EXISTS = "The key does not exists on this server"; + + private final String ERROR_NOT_NUMERIC = "The index provided is not numeric"; + + private final String SUCCESS = "OK"; + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 4) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.LTRIM)); + return; + } + + ByteArrayWrapper key = command.getKey(); + byte[] startArray = commandElems.get(2); + byte[] stopArray = commandElems.get(3); + + int redisStart; + int redisStop; + + + checkDataType(key, RedisDataType.REDIS_LIST, context); + Region keyRegion = getRegion(context, key); + + if (keyRegion == null) { + command + .setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_KEY_NOT_EXISTS)); + return; + } + + int listSize = keyRegion.size() - LIST_EMPTY_SIZE; + if (listSize == 0) { + command.setResponse(Coder.getSimpleStringResponse(context.getByteBufAllocator(), SUCCESS)); + return; + } + + try { + redisStart = Coder.bytesToInt(startArray); + redisStop = Coder.bytesToInt(stopArray); + } catch (NumberFormatException e) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_NOT_NUMERIC)); + return; + } + + redisStart = getBoundedStartIndex(redisStart, listSize); + redisStop = getBoundedEndIndex(redisStop, listSize); + redisStart = Math.min(redisStart, listSize - 1); + redisStop = Math.min(redisStop, listSize - 1); + + if (redisStart == 0 && redisStop == listSize - 1) { + command.setResponse(Coder.getSimpleStringResponse(context.getByteBufAllocator(), SUCCESS)); + return; + } else if (redisStart == 0 && redisStop < redisStart) { + context.getRegionProvider().removeKey(key, RedisDataType.REDIS_LIST); + command.setResponse(Coder.getSimpleStringResponse(context.getByteBufAllocator(), SUCCESS)); + return; + } + + List<Integer> keepList; + try { + keepList = getRange(context, key, redisStart, redisStop, keyRegion); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (Object keyElement : keyRegion.keySet()) { + if (!keepList.contains(keyElement) && keyElement instanceof Integer) + keyRegion.remove(keyElement); + } + + // Reset indexes in meta data region + keyRegion.put("head", keepList.get(0)); + keyRegion.put("tail", keepList.get(keepList.size() - 1)); + command.setResponse(Coder.getSimpleStringResponse(context.getByteBufAllocator(), SUCCESS)); + } + + private List<Integer> getRange(ExecutionHandlerContext context, ByteArrayWrapper key, int start, + int stop, Region r) throws Exception { + Query query = getQuery(key, ListQuery.LTRIM, context); + + Object[] params = {Integer.valueOf(stop + 1)}; + + SelectResults<Integer> results = (SelectResults<Integer>) query.execute(params); + if (results == null || results.size() <= start) { + return null; + } + + return results.asList().subList(start, results.size()); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/ListExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/ListExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/ListExecutor.java new file mode 100644 index 0000000..26fc70d --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/ListExecutor.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.executor.AbstractExecutor; + +import java.util.List; + + +public abstract class ListExecutor extends AbstractExecutor { + + protected static final int LIST_EMPTY_SIZE = 2; + + protected static enum ListDirection { + LEFT, RIGHT + }; + + protected final static QueryService getQueryService() { + return GemFireCacheImpl.getInstance().getQueryService(); + } + + @SuppressWarnings("unchecked") + @Override + protected Region<Integer, ByteArrayWrapper> getOrCreateRegion(ExecutionHandlerContext context, + ByteArrayWrapper key, RedisDataType type) { + return (Region<Integer, ByteArrayWrapper>) context.getRegionProvider().getOrCreateRegion(key, + type, context); + } + + @SuppressWarnings("unchecked") + protected Region<Integer, ByteArrayWrapper> getRegion(ExecutionHandlerContext context, + ByteArrayWrapper key) { + return (Region<Integer, ByteArrayWrapper>) context.getRegionProvider().getRegion(key); + } + + /** + * Helper method to be used by the push commands to push elements onto a list. Because our current + * setup requires non trivial code to push elements in to a Region, I wanted all the push code to + * reside in one place. + * + * @param key Name of the list + * @param commandElems Pieces of the command, this is where the elements that need to be pushed + * live + * @param startIndex The index to start with in the commandElems list, inclusive + * @param endIndex The index to end with in the commandElems list, exclusive + * @param keyRegion Region of list + * @param pushType ListDirection.LEFT || ListDirection.RIGHT + * @param context Context of this push + */ + protected void pushElements(ByteArrayWrapper key, List<byte[]> commandElems, int startIndex, + int endIndex, Region keyRegion, ListDirection pushType, ExecutionHandlerContext context) { + + String indexKey = pushType == ListDirection.LEFT ? "head" : "tail"; + String oppositeKey = pushType == ListDirection.RIGHT ? "head" : "tail"; + Integer index = (Integer) keyRegion.get(indexKey); + Integer opp = (Integer) keyRegion.get(oppositeKey); + if (index != opp) + index += pushType == ListDirection.LEFT ? -1 : 1; // Subtract index if left push, add if right + // push + + /** + * Multi push command + * + * For every element that needs to be added + */ + + for (int i = startIndex; i < endIndex; i++) { + byte[] value = commandElems.get(i); + ByteArrayWrapper wrapper = new ByteArrayWrapper(value); + + /** + * + * First, use the start index to attempt to insert the value into the Region + * + */ + + Object oldValue; + do { + oldValue = keyRegion.putIfAbsent(index, wrapper); + if (oldValue != null) { + index += pushType == ListDirection.LEFT ? -1 : 1; // Subtract index if left push, add if + // right push + } + } while (oldValue != null); + + /** + * + * Next, update the index in the meta data region. Keep trying to replace the existing index + * unless the index is further out than previously inserted, that's ok. Example below: + * + * ********************** LPUSH/LPUSH *************************** Push occurring at the same + * time, further index update first | This push | | | | V V [-4] [-3] [-2] [-1] [0] [1] [2] + * + * In this case, -4 would already exist in the meta data region, therefore we do not try to + * put -3 in the meta data region because a further index is already there. + * *************************************************************** + * + * Another example + * + * ********************** LPUSH/LPOP ***************************** This push | Simultaneous + * LPOP, meta data head index already updated to -2 | | | | V V [-4] [X] [-2] [-1] [0] [1] [2] + * + * In this case, -2 would already exist in the meta data region, but we need to make sure the + * element at -4 is visible to all other threads so we will attempt to change the index to -4 + * as long as it is greater than -4 + * *************************************************************** + * + */ + + boolean indexSet = false; + do { + Integer existingIndex = (Integer) keyRegion.get(indexKey); + if ((pushType == ListDirection.RIGHT && existingIndex < index) + || (pushType == ListDirection.LEFT && existingIndex > index)) + indexSet = keyRegion.replace(indexKey, existingIndex, index); + else + break; + } while (!indexSet); + + } + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PopExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PopExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PopExecutor.java new file mode 100644 index 0000000..935e7a1 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PopExecutor.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.cache.Region; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.Extendable; +import org.apache.geode.redis.internal.RedisDataType; + +import java.util.List; + +public abstract class PopExecutor extends ListExecutor implements Extendable { + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 2) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), getArgsError())); + return; + } + + ByteArrayWrapper key = command.getKey(); + + checkDataType(key, RedisDataType.REDIS_LIST, context); + Region keyRegion = getRegion(context, key); + + if (keyRegion == null || keyRegion.size() == LIST_EMPTY_SIZE) { + command.setResponse(Coder.getNilResponse(context.getByteBufAllocator())); + return; + } + + String indexKey = popType() == ListDirection.LEFT ? "head" : "tail"; + String oppositeKey = popType() == ListDirection.RIGHT ? "head" : "tail"; + Integer index = 0; + int originalIndex = index; + int incr = popType() == ListDirection.LEFT ? 1 : -1; + ByteArrayWrapper valueWrapper = null; + + /** + * + * First attempt to hop over an index by moving the index down one in the meta data region. The + * desired index to remove is held within the field index + * + */ + + boolean indexChanged = false; + do { + index = (Integer) keyRegion.get(indexKey); + Integer opp = (Integer) keyRegion.get(oppositeKey); + if (index.equals(opp)) + break; + indexChanged = keyRegion.replace(indexKey, index, index + incr); + } while (!indexChanged); + + /** + * + * Now attempt to remove the value of the index. We must do a get to ensure a returned value and + * then call remove with the value to ensure no one else has removed it first. Otherwise, try + * other indexes + * + */ + + boolean removed = false; + int i = 0; + do { + valueWrapper = (ByteArrayWrapper) keyRegion.get(index); + if (valueWrapper != null) + removed = keyRegion.remove(index, valueWrapper); + + /** + * + * If remove has passed, our job is done and we can break and stop looking for a value + * + */ + + if (removed) + break; + + /** + * + * If the index has not been removed, we need to look at other indexes. Two cases exist: + * + * ************************** FIRST MISS *********************************** Push occurring at + * the same time, further index update first | This is location of miss | | | | V V [-4] [X] + * [-2] [-1] [0] [1] [2] <-- Direction of index update If this is the first miss, the index is + * re obtained from the meta region and that index is trying. However, if the index in the + * meta data region is not further out, that index is not used and moves on to the second case + * ************************************************************************** + * + * ************************* SUBSEQUENT MISSES ****************************** Push occurring + * at the same time, further index update first | This is location of miss | | | | V V [-4] + * [X] [-2] [-1] [0] [1] [2] Direction of index update --> If this is not the first miss then + * we move down to the other end of the list which means the next not empty index will be + * attempted to be removed + * ************************************************************************** + * + * If it is the case that the list is empty, it will exit this loop + * + */ + + index += incr; + Integer metaIndex = (Integer) keyRegion.get(indexKey); + if (i < 1 && (popType() == ListDirection.LEFT && metaIndex < originalIndex + || popType() == ListDirection.RIGHT && metaIndex > originalIndex)) + index = metaIndex; + i++; + } while (!removed && keyRegion.size() != LIST_EMPTY_SIZE); + if (valueWrapper != null) + command.setResponse( + Coder.getBulkStringResponse(context.getByteBufAllocator(), valueWrapper.toBytes())); + else + command.setResponse(Coder.getNilResponse(context.getByteBufAllocator())); + } + + protected abstract ListDirection popType(); + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PushExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PushExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PushExecutor.java new file mode 100644 index 0000000..ace592a --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PushExecutor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.cache.Region; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.Extendable; +import org.apache.geode.redis.internal.RedisDataType; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class PushExecutor extends PushXExecutor implements Extendable { + + private final int START_VALUES_INDEX = 2; + static volatile AtomicInteger puts = new AtomicInteger(0); + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 3) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), getArgsError())); + return; + } + + ByteArrayWrapper key = command.getKey(); + + Region<Integer, ByteArrayWrapper> keyRegion = + getOrCreateRegion(context, key, RedisDataType.REDIS_LIST); + pushElements(key, commandElems, START_VALUES_INDEX, commandElems.size(), keyRegion, pushType(), + context); + int listSize = keyRegion.size() - LIST_EMPTY_SIZE; + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), listSize)); + } + + protected abstract ListDirection pushType(); + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PushXExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PushXExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PushXExecutor.java new file mode 100644 index 0000000..251ae0d --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/PushXExecutor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.cache.Region; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.Extendable; +import org.apache.geode.redis.internal.RedisDataType; + +import java.util.List; + +public abstract class PushXExecutor extends ListExecutor implements Extendable { + + private final int NOT_EXISTS = 0; + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 3) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), getArgsError())); + return; + } + + ByteArrayWrapper key = command.getKey(); + + Region<Integer, ByteArrayWrapper> keyRegion = getRegion(context, key); + if (keyRegion == null) { + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NOT_EXISTS)); + return; + } + checkDataType(key, RedisDataType.REDIS_LIST, context); + pushElements(key, commandElems, 2, 3, keyRegion, pushType(), context); + + int listSize = keyRegion.size() - LIST_EMPTY_SIZE; + + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), listSize)); + } + + protected abstract ListDirection pushType(); + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPopExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPopExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPopExecutor.java new file mode 100644 index 0000000..3d25ae6 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPopExecutor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.redis.internal.RedisConstants.ArityDef; + + +public class RPopExecutor extends PopExecutor { + + @Override + protected ListDirection popType() { + return ListDirection.RIGHT; + } + + @Override + public String getArgsError() { + return ArityDef.RPOP; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPushExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPushExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPushExecutor.java new file mode 100644 index 0000000..04f6bfd --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPushExecutor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.redis.internal.RedisConstants.ArityDef; + + +public class RPushExecutor extends PushExecutor { + + @Override + protected ListDirection pushType() { + return ListDirection.RIGHT; + } + + @Override + public String getArgsError() { + return ArityDef.RPUSH; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPushXExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPushXExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPushXExecutor.java new file mode 100644 index 0000000..8d1a007 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/list/RPushXExecutor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.list; + +import org.apache.geode.redis.internal.RedisConstants.ArityDef; + + +public class RPushXExecutor extends PushXExecutor { + + @Override + protected ListDirection pushType() { + return ListDirection.RIGHT; + } + + @Override + public String getArgsError() { + return ArityDef.RPUSHX; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/c6dbc6d4/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java ---------------------------------------------------------------------- diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java new file mode 100644 index 0000000..18cf1b6 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.set; + +import org.apache.geode.cache.Region; +import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.Command; +import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisConstants.ArityDef; +import org.apache.geode.redis.internal.RedisDataType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SAddExecutor extends SetExecutor { + + @Override + public void executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> commandElems = command.getProcessedCommand(); + + if (commandElems.size() < 3) { + command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ArityDef.SADD)); + return; + } + + ByteArrayWrapper key = command.getKey(); + @SuppressWarnings("unchecked") + Region<ByteArrayWrapper, Boolean> keyRegion = (Region<ByteArrayWrapper, Boolean>) context + .getRegionProvider().getOrCreateRegion(key, RedisDataType.REDIS_SET, context); + + if (commandElems.size() >= 4) { + Map<ByteArrayWrapper, Boolean> entries = new HashMap<ByteArrayWrapper, Boolean>(); + for (int i = 2; i < commandElems.size(); i++) + entries.put(new ByteArrayWrapper(commandElems.get(i)), true); + + keyRegion.putAll(entries); + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), entries.size())); + } else { + Object v = keyRegion.put(new ByteArrayWrapper(commandElems.get(2)), true); + command + .setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), v == null ? 1 : 0)); + } + + } + +}
