Fix version compatibility issue for Abstract redis store input operator. Reverted back to original super class
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/731b8bbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/731b8bbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/731b8bbe Branch: refs/heads/feature-AppData Commit: 731b8bbe3b09ac54a5c5a985e1e76667cc417002 Parents: 0b31fee Author: ishark <[email protected]> Authored: Tue Aug 18 13:36:21 2015 -0700 Committer: ishark <[email protected]> Committed: Tue Aug 18 13:36:21 2015 -0700 ---------------------------------------------------------------------- .../contrib/redis/AbstractRedisInputOperator.java | 7 +++++-- .../contrib/redis/RedisKeyValueInputOperator.java | 9 +++++++++ .../contrib/redis/RedisMapAsValueInputOperator.java | 8 ++++++++ .../datatorrent/contrib/redis/RedisPOJOInputOperator.java | 7 +++++++ 4 files changed, 29 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java index 260fbf6..5e62dbb 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java @@ -19,13 +19,16 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; + import javax.validation.constraints.NotNull; + import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; + import com.datatorrent.api.Operator.CheckpointListener; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.netlet.util.DTThrowable; -import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; import com.datatorrent.lib.io.IdempotentStorageManager; /** @@ -39,7 +42,7 @@ import com.datatorrent.lib.io.IdempotentStorageManager; * The tuple type. * @since 0.9.3 */ -public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener +public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> implements CheckpointListener { protected transient List<String> keys = new ArrayList<String>(); protected transient Integer scanOffset; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java index 8f419bd..0d0efe8 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java @@ -18,6 +18,8 @@ package com.datatorrent.contrib.redis; import java.util.ArrayList; import java.util.List; +import java.util.Map; + import com.datatorrent.lib.util.KeyValPair; /** @@ -52,4 +54,11 @@ public class RedisKeyValueInputOperator extends AbstractRedisInputOperator<KeyVa keysObjectList.clear(); } } + + @Override + public KeyValPair<String, String> convertToTuple(Map<Object, Object> o) + { + // Do nothing for the override, Scan already done in processTuples + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java index 66ef582..a7f0cd2 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java @@ -16,6 +16,7 @@ package com.datatorrent.contrib.redis; import java.util.Map; + import com.datatorrent.lib.util.KeyValPair; /** @@ -42,4 +43,11 @@ public class RedisMapAsValueInputOperator extends AbstractRedisInputOperator<Key } keys.clear(); } + + @Override + public KeyValPair<String, Map<String, String>> convertToTuple(Map<Object, Object> o) + { + // Do nothing for the override, Emit already handled in processTuples + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/731b8bbe/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java index 5a73e61..ac3f7fc 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java @@ -201,4 +201,11 @@ public class RedisPOJOInputOperator extends AbstractRedisInputOperator<KeyValPai { this.dataColumns = dataColumns; } + + @Override + public KeyValPair<String, Object> convertToTuple(Map<Object, Object> o) + { + // Do nothing for the override, Scan already done in processTuples + return null; + } }
