MLHR-1748 #resolve Created concrete input and output operators for Redis Store Added test cases for the same.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a57a3d75 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a57a3d75 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a57a3d75 Branch: refs/heads/master Commit: a57a3d756bafc1269c827a71d4fea549abdf65dd Parents: f40ba34 Author: ishark <[email protected]> Authored: Mon Jun 29 15:52:25 2015 -0700 Committer: ishark <[email protected]> Committed: Fri Aug 14 12:07:36 2015 -0700 ---------------------------------------------------------------------- contrib/pom.xml | 8 +- .../redis/AbstractRedisInputOperator.java | 224 +++++++++++++++++- .../redis/RedisKeyValueInputOperator.java | 55 +++++ .../redis/RedisMapAsValueInputOperator.java | 45 ++++ .../contrib/redis/RedisPOJOInputOperator.java | 204 ++++++++++++++++ .../contrib/redis/RedisPOJOOutputOperator.java | 155 +++++++++++++ .../datatorrent/contrib/redis/RedisStore.java | 27 +++ .../contrib/redis/RedisInputOperatorTest.java | 193 ++++++++++++++++ .../contrib/redis/RedisPOJOOperatorTest.java | 230 +++++++++++++++++++ demos/machinedata/pom.xml | 2 +- 10 files changed, 1138 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 9776e2f..50d7234 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -181,6 +181,12 @@ <dependencies> <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>2.7.8</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> @@ -382,7 +388,7 @@ <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> - <version>2.2.1</version> + <version>2.5.1</version> <optional>true</optional> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java index ff7a9a5..7f79bd0 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java @@ -15,11 +15,22 @@ */ package com.datatorrent.contrib.redis; -import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import javax.validation.constraints.NotNull; +import redis.clients.jedis.ScanParams; +import redis.clients.jedis.ScanResult; +import com.datatorrent.api.Operator.CheckpointListener; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.netlet.util.DTThrowable; +import com.datatorrent.lib.db.AbstractStoreInputOperator; +import com.datatorrent.lib.io.IdempotentStorageManager; /** * This is the base implementation of a Redis input operator. - * <p></p> + * * @displayName Abstract Redis Input * @category Input * @tags redis, key value @@ -27,6 +38,213 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; * @param <T> The tuple type. * @since 0.9.3 */ -public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStoreInputOperator<T, RedisStore> +public abstract class AbstractRedisInputOperator<T> extends AbstractStoreInputOperator<T, RedisStore> implements CheckpointListener { + protected transient List<String> keys = new ArrayList<String>(); + protected transient Integer scanOffset; + protected transient ScanParams scanParameters; + private transient boolean scanComplete; + private transient Integer backupOffset; + private int scanCount; + private transient boolean replay; + + @NotNull + private IdempotentStorageManager idempotentStorageManager; + + private transient OperatorContext context; + private transient long currentWindowId; + private transient Integer sleepTimeMillis; + private transient Integer scanCallsInCurrentWindow; + private RecoveryState recoveryState; + + /* + * Recovery State contains last offset processed in window and number of times + * ScanKeys was invoked in window We need to capture to capture number of + * calls to ScanKeys because, last offset returned by scanKeys call is not + * always monotonically increasing. Storing offset and number of times scan + * was done for each window, guarantees idempotency for each window + */ + public static class RecoveryState implements Serializable + { + public Integer scanOffsetAtBeginWindow, numberOfScanCallsInWindow; + } + + public AbstractRedisInputOperator() + { + scanCount = 100; + recoveryState = new RecoveryState(); + recoveryState.scanOffsetAtBeginWindow = 0; + recoveryState.numberOfScanCallsInWindow = 0; + setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager()); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + scanCallsInCurrentWindow = 0; + replay = false; + if (currentWindowId <= getIdempotentStorageManager().getLargestRecoveryWindow()) { + replay(windowId); + } + } + + private void replay(long windowId) + { + try { + if (checkIfWindowExistsInIdempotencyManager(windowId - 1)) { + // Begin offset for this window is recovery offset stored for the last + // window + RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1); + recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow; + } + + RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId); + recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow; + if (recoveryState.scanOffsetAtBeginWindow != null) { + scanOffset = recoveryState.scanOffsetAtBeginWindow; + } + replay = true; + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + + private boolean checkIfWindowExistsInIdempotencyManager(long windowId) throws IOException + { + long[] windowsIds = getIdempotentStorageManager().getWindowIds(context.getId()); + if(windowsIds.length == 0 || windowId < windowsIds[0] || windowId > windowsIds[windowsIds.length - 1]) { + return false; + } + return true ; + } + + private void scanKeysFromOffset() + { + if (!scanComplete) { + if (replay && scanCallsInCurrentWindow >= recoveryState.numberOfScanCallsInWindow) { + try { + Thread.sleep(sleepTimeMillis); + } catch (InterruptedException e) { + DTThrowable.rethrow(e); + } + return; + } + + ScanResult<String> result = store.ScanKeys(scanOffset, scanParameters); + backupOffset = scanOffset; + scanOffset = Integer.parseInt(result.getStringCursor()); + if (scanOffset == 0) { + // Redis store returns 0 after all data is read + scanComplete = true; + + // point scanOffset to the end in this case for reading any new tuples + scanOffset = backupOffset + result.getResult().size(); + } + keys = result.getResult(); + } + scanCallsInCurrentWindow++; + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + sleepTimeMillis = context.getValue(context.SPIN_MILLIS); + getIdempotentStorageManager().setup(context); + this.context = context; + scanOffset = 0; + scanComplete = false; + scanParameters = new ScanParams(); + scanParameters.count(scanCount); + // For the 1st window after checkpoint, windowID - 1 would not have recovery + // offset stored in idempotentStorageManager + // But recoveryOffset is non-transient, so will be recovered with + // checkPointing + scanOffset = recoveryState.scanOffsetAtBeginWindow; + } + + @Override + public void endWindow() + { + while (replay && scanCallsInCurrentWindow < recoveryState.numberOfScanCallsInWindow) { + // If less keys got scanned in this window, scan till recovery offset + scanKeysFromOffset(); + processTuples(); + } + super.endWindow(); + recoveryState.scanOffsetAtBeginWindow = scanOffset; + recoveryState.numberOfScanCallsInWindow = scanCallsInCurrentWindow; + + if (currentWindowId > getIdempotentStorageManager().getLargestRecoveryWindow()) { + try { + getIdempotentStorageManager().save(recoveryState, context.getId(), currentWindowId); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + } + + @Override + public void teardown() + { + super.teardown(); + getIdempotentStorageManager().teardown(); + } + + /* + * get number of keys to read for each redis key scan + */ + public int getScanCount() + { + return scanCount; + } + + /* + * set number of keys to read for each redis key scan + */ + public void setScanCount(int scanCount) + { + this.scanCount = scanCount; + } + + @Override + public void emitTuples() + { + scanKeysFromOffset(); + processTuples(); + } + + abstract public void processTuples(); + + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + try { + getIdempotentStorageManager().deleteUpTo(context.getId(), windowId); + } catch (IOException e) { + throw new RuntimeException("committing", e); + } + } + + /* + * get Idempotent Storage manager instance + */ + public IdempotentStorageManager getIdempotentStorageManager() + { + return idempotentStorageManager; + } + + /* + * set Idempotent storage manager instance + */ + public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + { + this.idempotentStorageManager = idempotentStorageManager; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java new file mode 100644 index 0000000..8f419bd --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datatorrent.contrib.redis; + +import java.util.ArrayList; +import java.util.List; +import com.datatorrent.lib.util.KeyValPair; + +/** + * This is the an implementation of a Redis input operator for fetching + * Key-Value pair stored in Redis. It takes in keys to fetch and emits + * corresponding <Key, Value> Pair. Value data type is String in this case. + * + * @displayName Redis Input Operator for Key Value pair + * @category Store + * @tags input operator, key value + * + */ +public class RedisKeyValueInputOperator extends AbstractRedisInputOperator<KeyValPair<String, String>> +{ + private List<Object> keysObjectList = new ArrayList<Object>(); + + @Override + public void processTuples() + { + keysObjectList = new ArrayList<Object>(keys); + if (keysObjectList.size() > 0) { + + List<Object> allValues = store.getAll(keysObjectList); + for (int i = 0; i < allValues.size() && i < keys.size(); i++) { + if (allValues.get(i) == null) { + outputPort.emit(new KeyValPair<String, String>(keys.get(i), null)); + } else { + outputPort.emit(new KeyValPair<String, String>(keys.get(i), allValues.get(i).toString())); + } + } + keys.clear(); + keysObjectList.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java new file mode 100644 index 0000000..66ef582 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.contrib.redis; + +import java.util.Map; +import com.datatorrent.lib.util.KeyValPair; + +/** + * This is the an implementation of a Redis input operator It takes in keys to + * fetch and emits Values stored as Maps in Redis i.e. when value datatype in + * Redis is HashMap + * + * @displayName Redis Input Operator for Map + * @category Store + * @tags input operator, key value + * + */ + +public class RedisMapAsValueInputOperator extends AbstractRedisInputOperator<KeyValPair<String, Map<String, String>>> +{ + @Override + public void processTuples() + { + for (String key : keys) { + if (store.getType(key).equals("hash")) { + Map<String, String> mapValue = store.getMap(key); + outputPort.emit(new KeyValPair<String, Map<String, String>>(key, mapValue)); + } + } + keys.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java new file mode 100644 index 0000000..5a73e61 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOInputOperator.java @@ -0,0 +1,204 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.contrib.redis; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Setter; +import com.datatorrent.lib.util.PojoUtils.SetterBoolean; +import com.datatorrent.lib.util.PojoUtils.SetterDouble; +import com.datatorrent.lib.util.PojoUtils.SetterFloat; +import com.datatorrent.lib.util.PojoUtils.SetterInt; +import com.datatorrent.lib.util.PojoUtils.SetterLong; +import com.datatorrent.lib.util.PojoUtils.SetterShort; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * This is a Redis input operator, which scans all keys in Redis store It + * converts Value stored as map to Plain Old Java Object. It outputs + * KeyValuePair with POJO as value + * <p> + * This output adapter Reads from RedisStore stored as <Key, Map> It outputs a + * Key value pair <key, POJO> as tuples. + * </p> + * + * @displayName Redis POJO Input Operator + * @category Store + * @tags output operator, key value + * + */ +@Evolving +public class RedisPOJOInputOperator extends AbstractRedisInputOperator<KeyValPair<String, Object>> +{ + protected final Map<String, Object> map = new HashMap<String, Object>(); + private ArrayList<FieldInfo> dataColumns; + private transient ArrayList<Object> setters; + private boolean isFirstTuple = true; + private String outputClass; + private Class<?> objectClass; + + public RedisPOJOInputOperator() + { + super(); + setters = new ArrayList<Object>(); + } + + @SuppressWarnings("unchecked") + private Object convertMapToObject(Map<String, String> tuple) + { + try { + Object mappedObject = objectClass.newInstance(); + for (int i = 0; i < dataColumns.size(); i++) { + final SupportType type = dataColumns.get(i).getType(); + final String columnName = dataColumns.get(i).getColumnName(); + + if (i < setters.size()) { + String value = tuple.get(columnName); + switch (type) { + case STRING: + ((Setter<Object, String>) setters.get(i)).set(mappedObject, value); + break; + case BOOLEAN: + ((SetterBoolean) setters.get(i)).set(mappedObject, Boolean.parseBoolean(value)); + break; + case SHORT: + ((SetterShort) setters.get(i)).set(mappedObject, Short.parseShort(value)); + break; + case INTEGER: + ((SetterInt) setters.get(i)).set(mappedObject, Integer.parseInt(value)); + break; + case LONG: + ((SetterLong) setters.get(i)).set(mappedObject, Long.parseLong(value)); + break; + case FLOAT: + ((SetterFloat) setters.get(i)).set(mappedObject, Float.parseFloat(value)); + break; + case DOUBLE: + ((SetterDouble) setters.get(i)).set(mappedObject, Double.parseDouble(value)); + break; + default: + break; + } + } + } + return mappedObject; + } catch (Exception e) { + DTThrowable.wrapIfChecked(e); + } + return null; + } + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + } + + public void processFirstTuple(Map<String, String> value) throws ClassNotFoundException + { + objectClass = Class.forName(getOutputClass()); + + final int size = dataColumns.size(); + for (int i = 0; i < size; i++) { + final SupportType type = dataColumns.get(i).getType(); + final String getterExpression = dataColumns.get(i).getPojoFieldExpression(); + final Object setter; + switch (type) { + case STRING: + setter = PojoUtils.createSetter(objectClass, getterExpression, String.class); + break; + case BOOLEAN: + setter = PojoUtils.createSetterBoolean(objectClass, getterExpression); + break; + case SHORT: + setter = PojoUtils.createSetterShort(objectClass, getterExpression); + break; + case INTEGER: + setter = PojoUtils.createSetterInt(objectClass, getterExpression); + break; + case LONG: + setter = PojoUtils.createSetterLong(objectClass, getterExpression); + break; + case FLOAT: + setter = PojoUtils.createSetterFloat(objectClass, getterExpression); + break; + case DOUBLE: + setter = PojoUtils.createSetterDouble(objectClass, getterExpression); + break; + default: + setter = PojoUtils.createSetter(objectClass, getterExpression, Object.class); + break; + } + setters.add(setter); + } + } + + @Override + public void processTuples() + { + for (String key : keys) { + if (store.getType(key).equals("hash")) { + Map<String, String> mapValue = store.getMap(key); + if (isFirstTuple) { + try { + processFirstTuple(mapValue); + } catch (ClassNotFoundException e) { + DTThrowable.rethrow(e); + } + } + isFirstTuple = false; + outputPort.emit(new KeyValPair<String, Object>(key, convertMapToObject(mapValue))); + } + } + keys.clear(); + } + + /* + * Output class type + */ + public String getOutputClass() + { + return outputClass; + } + + public void setOutputClass(String outputClass) + { + this.outputClass = outputClass; + } + + /* + * An arraylist of data column names to be set in Redis store as a Map. Gets + * column names, column expressions and column data types + */ + public ArrayList<FieldInfo> getDataColumns() + { + return dataColumns; + } + + public void setDataColumns(ArrayList<FieldInfo> dataColumns) + { + this.dataColumns = dataColumns; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java new file mode 100644 index 0000000..8966248 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisPOJOOutputOperator.java @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.contrib.redis; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; + +/** + * This is a Redis output operator, which takes a Key and corresponding Plain + * Old Java Object as input. And writes a Map out to Redis based on Expressions + * provided. + * <p> + * This output adapter takes a Key value pair <key, POJO> as tuples and just + * writes to the redis store with the key and the value is a Map containing + * object attributes as <keys,value> Note: Redis output operator should never + * use the passthrough method because it begins a transaction at beginWindow and + * commits a transaction at endWindow, and a transaction in Redis blocks all + * other clients. + * </p> + * + * @displayName Redis POJO Output Operator + * @category Store + * @tags output operator, key value + * + */ +public class RedisPOJOOutputOperator extends AbstractRedisAggregateOutputOperator<KeyValPair<String, Object>> +{ + protected final Map<String, Object> map = new HashMap<String, Object>(); + private ArrayList<FieldInfo> dataColumns; + private transient ArrayList<Object> getters; + private boolean isFirstTuple = true; + + public RedisPOJOOutputOperator() + { + super(); + getters = new ArrayList<Object>(); + } + + @Override + public void storeAggregate() + { + for (Entry<String, Object> entry : map.entrySet()) { + + Map<String, String> mapObject = convertObjectToMap(entry.getValue()); + store.put(entry.getKey(), mapObject); + } + } + + private Map<String, String> convertObjectToMap(Object tuple) + { + + Map<String, String> mappedObject = new HashMap<String, String>(); + for (int i = 0; i < dataColumns.size(); i++) { + final SupportType type = dataColumns.get(i).getType(); + final String columnName = dataColumns.get(i).getColumnName(); + + if (i < getters.size()) { + Getter<Object, Object> obj = (Getter<Object, Object>) (getters.get(i)); + + Object value = obj.get(tuple); + mappedObject.put(columnName, value.toString()); + } + } + + return mappedObject; + } + + public void processFirstTuple(KeyValPair<String, Object> tuple) + { + // Create getters using first value entry in map + // Entry<String, Object> entry= tuple.entrySet().iterator().next(); + Object value = tuple.getValue(); + + final Class<?> fqcn = value.getClass(); + final int size = dataColumns.size(); + for (int i = 0; i < size; i++) { + final SupportType type = dataColumns.get(i).getType(); + final String getterExpression = dataColumns.get(i).getPojoFieldExpression(); + final Object getter; + switch (type) { + case STRING: + getter = PojoUtils.createGetter(fqcn, getterExpression, String.class); + break; + case BOOLEAN: + getter = PojoUtils.createGetterBoolean(fqcn, getterExpression); + break; + case SHORT: + getter = PojoUtils.createGetterShort(fqcn, getterExpression); + break; + case INTEGER: + getter = PojoUtils.createGetter(fqcn, getterExpression, type.getJavaType()); + break; + case LONG: + getter = PojoUtils.createGetterLong(fqcn, getterExpression); + break; + case FLOAT: + getter = PojoUtils.createGetterFloat(fqcn, getterExpression); + break; + case DOUBLE: + getter = PojoUtils.createGetterDouble(fqcn, getterExpression); + break; + default: + getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class); + break; + } + getters.add(getter); + } + } + + @Override + public void processTuple(KeyValPair<String, Object> tuple) + { + if (isFirstTuple) { + processFirstTuple(tuple); + } + + isFirstTuple = false; + map.put(tuple.getKey(), tuple.getValue()); + } + + /* + * An arraylist of data column names to be set in Redis store as a Map. Gets + * column names, column expressions and column data types + */ + public ArrayList<FieldInfo> getDataColumns() + { + return dataColumns; + } + + public void setDataColumns(ArrayList<FieldInfo> dataColumns) + { + this.dataColumns = dataColumns; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java index ea8e26b..2acc1d5 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java @@ -23,6 +23,8 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; +import redis.clients.jedis.ScanParams; +import redis.clients.jedis.ScanResult; import redis.clients.jedis.Transaction; import com.datatorrent.lib.db.TransactionableKeyValueStore; @@ -181,6 +183,26 @@ public class RedisStore implements TransactionableKeyValueStore return jedis.get(key.toString()); } + public String getType(String key) + { + return jedis.type(key); + } + + /** + * Gets the stored Map for given the key, when the value data type is a map, stored with hmset + * + * @param key + * @return hashmap stored for the key. + */ + public Map<String, String> getMap(Object key) + { + if (isInTransaction()) { + throw new RuntimeException("Cannot call get when in redis transaction"); + } + return jedis.hgetAll(key.toString()); + } + + /** * Gets all the values given the keys. * Note that it does NOT work with hash values or list values @@ -255,6 +277,11 @@ public class RedisStore implements TransactionableKeyValueStore } } + public ScanResult<String> ScanKeys(Integer offset, ScanParams params) + { + return jedis.scan(offset.toString(), params); + } + /** * Calls hincrbyfloat on the redis store. * http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java new file mode 100644 index 0000000..08fb294 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java @@ -0,0 +1,193 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datatorrent.contrib.redis; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import redis.clients.jedis.ScanParams; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.LocalMode; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyValPair; + +public class RedisInputOperatorTest +{ + private RedisStore operatorStore; + private RedisStore testStore; + + public static class CollectorModule extends BaseOperator + { + volatile static List<KeyValPair<String, String>> resultMap = new ArrayList<KeyValPair<String, String>>(); + static long resultCount = 0; + + public final transient DefaultInputPort<KeyValPair<String, String>> inputPort = new DefaultInputPort<KeyValPair<String, String>>() + { + @Override + public void process(KeyValPair<String, String> tuple) + { + resultMap.add(tuple); + resultCount++; + } + }; + } + + @Test + public void testIntputOperator() throws IOException + { + this.operatorStore = new RedisStore(); + this.testStore = new RedisStore(); + + testStore.connect(); + ScanParams params = new ScanParams(); + params.count(1); + + testStore.put("test_abc", "789"); + testStore.put("test_def", "456"); + testStore.put("test_ghi", "123"); + + try { + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + RedisKeyValueInputOperator inputOperator = dag.addOperator("input", new RedisKeyValueInputOperator()); + final CollectorModule collector = dag.addOperator("collector", new CollectorModule()); + + inputOperator.setStore(operatorStore); + dag.addStream("stream", inputOperator.outputPort, collector.inputPort); + final LocalMode.Controller lc = lma.getController(); + + new Thread("LocalClusterController") + { + @Override + public void run() + { + long startTms = System.currentTimeMillis(); + long timeout = 50000L; + try { + Thread.sleep(1000); + while (System.currentTimeMillis() - startTms < timeout) { + if (CollectorModule.resultMap.size() < 3) { + Thread.sleep(10); + } else { + break; + } + } + } catch (InterruptedException ex) { + } + lc.shutdown(); + } + }.start(); + + lc.run(); + + Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_abc", "789"))); + Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_def", "456"))); + Assert.assertTrue(CollectorModule.resultMap.contains(new KeyValPair<String, String>("test_ghi", "123"))); + } finally { + for (KeyValPair<String, String> entry : CollectorModule.resultMap) { + testStore.remove(entry.getKey()); + } + testStore.disconnect(); + } + } + + @Test + public void testRecoveryAndIdempotency() throws Exception + { + this.operatorStore = new RedisStore(); + this.testStore = new RedisStore(); + + testStore.connect(); + ScanParams params = new ScanParams(); + params.count(1); + + testStore.put("test_abc", "789"); + testStore.put("test_def", "456"); + testStore.put("test_ghi", "123"); + + RedisKeyValueInputOperator operator = new RedisKeyValueInputOperator(); + operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + + operator.setStore(operatorStore); + operator.setScanCount(1); + Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + CollectorTestSink<Object> sink = new CollectorTestSink<Object>(); + + operator.outputPort.setSink(sink); + OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap); + + try { + operator.setup(context); + operator.beginWindow(1); + operator.emitTuples(); + operator.endWindow(); + + int numberOfMessagesInWindow1 = sink.collectedTuples.size(); + sink.collectedTuples.clear(); + + operator.beginWindow(2); + operator.emitTuples(); + operator.endWindow(); + int numberOfMessagesInWindow2 = sink.collectedTuples.size(); + sink.collectedTuples.clear(); + + // failure and then re-deployment of operator + // Re-instantiating to reset values + operator = new RedisKeyValueInputOperator(); + operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + operator.setStore(operatorStore); + operator.setScanCount(1); + operator.outputPort.setSink(sink); + operator.setup(context); + + Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + + operator.beginWindow(1); + operator.emitTuples(); + operator.emitTuples(); + operator.endWindow(); + + Assert.assertEquals("num of messages in window 1", numberOfMessagesInWindow1, sink.collectedTuples.size()); + + sink.collectedTuples.clear(); + operator.beginWindow(2); + operator.emitTuples(); + operator.endWindow(); + Assert.assertEquals("num of messages in window 2",numberOfMessagesInWindow2, sink.collectedTuples.size()); + } finally { + for (Object e : sink.collectedTuples) { + KeyValPair<String, String> entry = (KeyValPair<String, String>) e; + testStore.remove(entry.getKey()); + } + sink.collectedTuples.clear(); + operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 5); + operator.teardown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java new file mode 100644 index 0000000..7792b5a --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisPOJOOperatorTest.java @@ -0,0 +1,230 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.contrib.redis; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import redis.clients.jedis.ScanParams; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.LocalMode; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.contrib.redis.RedisInputOperatorTest.CollectorModule; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; +import com.datatorrent.lib.util.KeyValPair; + +public class RedisPOJOOperatorTest +{ + private RedisStore operatorStore; + private RedisStore testStore; + + public static class TestClass + { + private Integer intValue; + private String stringValue; + + public TestClass() + { + } + + public TestClass(int v1, String v2) + { + intValue = v1; + stringValue = v2; + } + + public Integer getIntValue() + { + return intValue; + } + + public void setIntValue(int intValue) + { + this.intValue = intValue; + } + + public String getStringValue() + { + return stringValue; + } + + public void setStringValue(String stringValue) + { + this.stringValue = stringValue; + } + } + + @Test + public void testOutputOperator() throws IOException + { + this.operatorStore = new RedisStore(); + + operatorStore.connect(); + String appId = "test_appid"; + int operatorId = 0; + + operatorStore.removeCommittedWindowId(appId, operatorId); + operatorStore.disconnect(); + + RedisPOJOOutputOperator outputOperator = new RedisPOJOOutputOperator(); + + ArrayList<FieldInfo> fields = new ArrayList<FieldInfo>(); + + fields.add(new FieldInfo("column1", "intValue", SupportType.INTEGER)); + fields.add(new FieldInfo("column2", "getStringValue()", SupportType.STRING)); + + outputOperator.setDataColumns(fields); + + try { + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_ID, appId); + + outputOperator.setStore(operatorStore); + outputOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes)); + outputOperator.beginWindow(101); + + KeyValPair<String, Object> keyVal = new KeyValPair<String, Object>("test_abc1", new TestClass(1, "abc")); + + outputOperator.input.process(keyVal); + + outputOperator.endWindow(); + + outputOperator.teardown(); + + operatorStore.connect(); + + Map<String, String> out = operatorStore.getMap("test_abc1"); + Assert.assertEquals("1", out.get("column1")); + Assert.assertEquals("abc", out.get("column2")); + } finally { + operatorStore.remove("test_abc1"); + operatorStore.disconnect(); + } + } + + public static class ObjectCollectorModule extends BaseOperator + { + volatile static Map<String, Object> resultMap = new HashMap<String, Object>(); + static long resultCount = 0; + + public final transient DefaultInputPort<KeyValPair<String, Object>> inputPort = new DefaultInputPort<KeyValPair<String, Object>>() + { + @Override + public void process(KeyValPair<String, Object> tuple) + { + resultMap.put(tuple.getKey(), tuple.getValue()); + resultCount++; + } + }; + } + + @Test + public void testInputOperator() throws IOException + { + @SuppressWarnings("unused") + Class<?> clazz = org.codehaus.janino.CompilerFactory.class; + + this.operatorStore = new RedisStore(); + this.testStore = new RedisStore(); + + testStore.connect(); + ScanParams params = new ScanParams(); + params.count(100); + + Map<String, String> value = new HashMap<String, String>(); + value.put("Column1", "abc"); + value.put("Column2", "1"); + + Map<String, String> value1 = new HashMap<String, String>(); + value1.put("Column1", "def"); + value1.put("Column2", "2"); + + Map<String, String> value2 = new HashMap<String, String>(); + value2.put("Column1", "ghi"); + value2.put("Column2", "3"); + + testStore.put("test_abc_in", value); + testStore.put("test_def_in", value1); + testStore.put("test_ghi_in", value2); + + try { + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + RedisPOJOInputOperator inputOperator = dag.addOperator("input", new RedisPOJOInputOperator()); + final ObjectCollectorModule collector = dag.addOperator("collector", new ObjectCollectorModule()); + + ArrayList<FieldInfo> fields = new ArrayList<FieldInfo>(); + + fields.add(new FieldInfo("Column1", "stringValue", SupportType.STRING)); + fields.add(new FieldInfo("Column2", "intValue", SupportType.INTEGER)); + + inputOperator.setDataColumns(fields); + inputOperator.setOutputClass(TestClass.class.getName()); + + inputOperator.setStore(operatorStore); + dag.addStream("stream", inputOperator.outputPort, collector.inputPort); + final LocalMode.Controller lc = lma.getController(); + + new Thread("LocalClusterController") + { + @Override + public void run() + { + long startTms = System.currentTimeMillis(); + long timeout = 10000L; + try { + Thread.sleep(1000); + while (System.currentTimeMillis() - startTms < timeout) { + if (ObjectCollectorModule.resultMap.size() < 3) { + Thread.sleep(10); + } else { + break; + } + } + } catch (InterruptedException ex) { + } + lc.shutdown(); + } + }.start(); + + lc.run(); + + Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_abc_in")); + Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_def_in")); + Assert.assertTrue(ObjectCollectorModule.resultMap.containsKey("test_ghi_in")); + + TestClass a = (TestClass) ObjectCollectorModule.resultMap.get("test_abc_in"); + Assert.assertNotNull(a); + Assert.assertEquals("abc", a.stringValue); + Assert.assertEquals("1", a.intValue.toString()); + } finally { + for (KeyValPair<String, String> entry : CollectorModule.resultMap) { + testStore.remove(entry.getKey()); + } + testStore.disconnect(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a57a3d75/demos/machinedata/pom.xml ---------------------------------------------------------------------- diff --git a/demos/machinedata/pom.xml b/demos/machinedata/pom.xml index 1f3f075..3498d0d 100644 --- a/demos/machinedata/pom.xml +++ b/demos/machinedata/pom.xml @@ -31,7 +31,7 @@ <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> - <version>2.2.1</version> + <version>2.5.1</version> </dependency> <dependency> <groupId>javax.mail</groupId>
