APEXMALHAR-2023 Adding Enrichment Operator to Malhar Added 2 operators POJOEnricher and MapEnricher which enriches the given POJO or map as configured. The operators are marked evolving. Test cases added for both operators.
2 Backend loaders are added for File and JDBC. Hbase loader removed as there are some things that needs to be discussed. Postponed the implementation for after discussion. Added a new SupportType for OBJECT to FieldInfo. Added static method in FieldInfo for conversion to Class -> SupportedType. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9600eddd Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9600eddd Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9600eddd Branch: refs/heads/master Commit: 9600eddd8ee4fa010cbb4799da74e89d184daa40 Parents: 528e7ac Author: chinmaykolhatkar <chin...@datatorrent.com> Authored: Tue Mar 29 14:46:54 2016 +0530 Committer: chinmaykolhatkar <chin...@datatorrent.com> Committed: Fri Apr 29 22:59:48 2016 +0530 ---------------------------------------------------------------------- .../contrib/enrich/AbstractEnricher.java | 321 +++++++++++++++++++ .../contrib/enrich/BackendLoader.java | 42 +++ .../datatorrent/contrib/enrich/FSLoader.java | 184 +++++++++++ .../datatorrent/contrib/enrich/JDBCLoader.java | 201 ++++++++++++ .../datatorrent/contrib/enrich/MapEnricher.java | 138 ++++++++ .../contrib/enrich/NullValuesCacheManager.java | 60 ++++ .../contrib/enrich/POJOEnricher.java | 287 +++++++++++++++++ .../contrib/enrich/ReadOnlyBackup.java | 61 ++++ .../enrichment/AbstractEnrichmentOperator.java | 170 ---------- .../contrib/enrichment/EnrichmentBackup.java | 18 -- .../contrib/enrichment/FSLoader.java | 146 --------- .../contrib/enrichment/HBaseLoader.java | 128 -------- .../contrib/enrichment/JDBCLoader.java | 158 --------- .../enrichment/MapEnrichmentOperator.java | 57 ---- .../enrichment/NullValuesCacheManager.java | 43 --- .../enrichment/POJOEnrichmentOperator.java | 185 ----------- .../contrib/enrichment/ReadOnlyBackup.java | 38 --- .../contrib/enrichment/package-info.java | 1 - .../contrib/enrich/EmployeeOrder.java | 114 +++++++ .../contrib/enrich/FileEnrichmentTest.java | 103 ++++++ .../contrib/enrich/JDBCLoaderTest.java | 209 ++++++++++++ .../contrib/enrich/MapEnricherTest.java | 251 +++++++++++++++ .../com/datatorrent/contrib/enrich/Order.java | 71 ++++ .../contrib/enrich/POJOEnricherTest.java | 232 ++++++++++++++ .../enrichment/BeanEnrichmentOperatorTest.java | 95 ------ .../contrib/enrichment/EmployeeOrder.java | 95 ------ .../contrib/enrichment/FileEnrichmentTest.java | 75 ----- .../contrib/enrichment/HBaseLoaderTest.java | 162 ---------- .../contrib/enrichment/JDBCLoaderTest.java | 179 ----------- .../enrichment/MapEnrichmentOperatorTest.java | 152 --------- contrib/src/test/resources/productmapping.txt | 100 ++++++ .../com/datatorrent/lib/util/FieldInfo.java | 15 +- 32 files changed, 2388 insertions(+), 1703 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java new file mode 100644 index 0000000..cdefddf --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.NotNull; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.db.cache.CacheManager; +import com.datatorrent.lib.db.cache.CacheStore; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.FieldInfo.SupportType; + +/** + * Base class for Enrichment Operator. Subclasses should provide implementation to getKey and convert. + * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/> + * <p> + * Properties:<br> + * <b>lookupFields</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br> + * <b>includeFields</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br> + * <b>store</b>: Specify the type of loader for looking data<br> + * <br> + * + * @param <INPUT> Type of tuples which are received by this operator</T> + * @param <OUTPUT> Type of tuples which are emitted by this operator</T> + * @displayName Abstract Enrichment Operator + * @tags Enrichment + */ +@InterfaceStability.Evolving +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator implements Operator.ActivationListener +{ + /** + * Mandatory parameters for Enricher + */ + @NotNull + protected List<String> lookupFields; + @NotNull + protected List<String> includeFields; + @NotNull + private BackendLoader store; + + /** + * Optional parameters for enricher. + */ + private int cacheExpirationInterval = 1 * 60 * 60 * 1000; // 1 hour + private int cacheCleanupInterval = 1 * 60 * 60 * 1000; // 1 hour + private int cacheSize = 1024; // 1024 records + + /** + * Helper variables. + */ + private transient CacheManager cacheManager; + protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>(); + protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>(); + + /** + * This method needs to be called by implementing class for processing a tuple for enrichment. + * The method will take the tuple through following stages: + * <ol> + * <li>Call {@link #getKey(Object)} to retrieve key fields for lookup</li> + * <li>Using key fields call cache manager to retrieve for any key that is cached already</li> + * <li>If not found in cache, it'll do a lookup in configured backend store</li> + * <li>The retrieved fields will be passed to {@link #convert(Object, Object)} method to create the final object</li> + * <li>Finally {@link #emitEnrichedTuple(Object)} is called for emitting the tuple</li> + * </ol> + * + * @param tuple Input tuple that needs to get processed for enrichment. + */ + protected void enrichTuple(INPUT tuple) + { + Object key = getKey(tuple); + if (key != null) { + Object result = cacheManager.get(key); + OUTPUT out = convert(tuple, result); + if (out != null) { + emitEnrichedTuple(out); + } + } + } + + /** + * The method should be implemented by concrete class which returns an ArrayList<Object> containing all the fields + * which forms key part of lookup. + * The order of field values should be same as the one set in {@link #lookupFields} variable. + * + * @param tuple Input tuple from which fields values for key needs to be fetched. + * @return Should return ArrayList<Object> which has fields values forming keys in same order as {@link #lookupFields} + */ + protected abstract Object getKey(INPUT tuple); + + /** + * The method should be implemented by concrete class. + * This method is expected to take input tuple and an externally fetched object containing fields to be enriched, and + * return an enriched tuple which is ready to be emitted. + * + * @param in Input tuple which needs to be enriched. + * @param cached ArrayList<Object> containing missing data retrieved from external sources. + * @return Enriched tuple of type OUTPUT + */ + protected abstract OUTPUT convert(INPUT in, Object cached); + + /** + * This method should be implemented by concrete class. + * The method is expected to emit tuple of type OUTPUT + * + * @param tuple Tuple of type OUTPUT that should be emitted. + */ + protected abstract void emitEnrichedTuple(OUTPUT tuple); + + /** + * This method should be implemented by concrete method. + * The method should return Class type of field for given fieldName from output tuple. + * + * @param fieldName Field name for which field type needs to be identified + * @return Class type for given field. + */ + protected abstract Class<?> getIncludeFieldType(String fieldName); + + /** + * This method should be implemented by concrete method. + * The method should return Class type of field for given fieldName from input tuple. + * + * @param fieldName Field name for which field type needs to be identified + * @return Class type for given field. + */ + protected abstract Class<?> getLookupFieldType(String fieldName); + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + + cacheManager = new NullValuesCacheManager(); + CacheStore primaryCache = new CacheStore(); + + // set expiration to one day. + primaryCache.setEntryExpiryDurationInMillis(cacheExpirationInterval); + primaryCache.setCacheCleanupInMillis(cacheCleanupInterval); + primaryCache.setEntryExpiryStrategy(CacheStore.ExpiryType.EXPIRE_AFTER_WRITE); + primaryCache.setMaxCacheSize(cacheSize); + + cacheManager.setPrimary(primaryCache); + cacheManager.setBackup(store); + } + + @Override + public void activate(Context context) + { + for (String s : lookupFields) { + lookupFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getLookupFieldType(s)))); + } + + if (includeFields != null) { + for (String s : includeFields) { + includeFieldInfo.add(new FieldInfo(s, s, SupportType.getFromJavaType(getIncludeFieldType(s)))); + } + } + + store.setFieldInfo(lookupFieldInfo, includeFieldInfo); + + try { + cacheManager.initialize(); + } catch (IOException e) { + throw new RuntimeException("Unable to initialize primary cache", e); + } + } + + @Override + public void deactivate() + { + } + + /** + * Returns a list of fields which are used for lookup. + * + * @return List of fields + */ + public List<String> getLookupFields() + { + return lookupFields; + } + + /** + * Set fields on which lookup needs to happen in external store. + * This is a mandatory parameter. + * + * @param lookupFields List of fields on which lookup happens. + */ + public void setLookupFields(List<String> lookupFields) + { + this.lookupFields = lookupFields; + } + + /** + * Returns a list of fields using which tuple is enriched + * + * @return List of fields. + */ + public List<String> getIncludeFields() + { + return includeFields; + } + + /** + * Sets list of fields to be fetched from external store for enriching the tuple. + * This is a mandatory parameter. + * + * @param includeFields List of fields. + */ + public void setIncludeFields(List<String> includeFields) + { + this.includeFields = includeFields; + } + + /** + * Returns the backend store which will enrich the tuple. + * + * @return Object of type {@link BackendLoader} + */ + public BackendLoader getStore() + { + return store; + } + + /** + * Sets backend store which will enrich the tuple. + * This is a mandatory parameter. + * + * @param store Object of type {@link BackendLoader} + */ + public void setStore(BackendLoader store) + { + this.store = store; + } + + /** + * Returns cache entry expiration interval in ms. + * This is an optional parameter. + * + * @return Cache entry expiration interval in ms + */ + public int getCacheExpirationInterval() + { + return cacheExpirationInterval; + } + + /** + * Sets cache entry expiration interval in ms. + * This is an optional parameter. + * + * @param cacheExpirationInterval Cache entry expiration interval in ms + */ + public void setCacheExpirationInterval(int cacheExpirationInterval) + { + this.cacheExpirationInterval = cacheExpirationInterval; + } + + /** + * Returns cache cleanup interval in ms. After this interval, cache cleanup operation will be performed. + * This is an optional parameter. + * + * @return cache cleanup interval in ms. + */ + public int getCacheCleanupInterval() + { + return cacheCleanupInterval; + } + + /** + * Set Cache cleanup interval in ms. After this interval, cache cleanup operation will be performed. + * This is an optional parameter. + * + * @param cacheCleanupInterval cache cleanup interval in ms. + */ + public void setCacheCleanupInterval(int cacheCleanupInterval) + { + this.cacheCleanupInterval = cacheCleanupInterval; + } + + /** + * Get size (number of entries) of cache. + * + * @return Number of entries allowed in cache. + */ + public int getCacheSize() + { + return cacheSize; + } + + /** + * Set size (number of entries) of cache. + * + * @param cacheSize Number of entries allowed in cache. + */ + public void setCacheSize(int cacheSize) + { + this.cacheSize = cacheSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java new file mode 100644 index 0000000..9570329 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/BackendLoader.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.lib.db.cache.CacheManager; +import com.datatorrent.lib.util.FieldInfo; + +/** + * Interface for store to be used in enrichment + */ +@InterfaceStability.Evolving +public interface BackendLoader extends CacheManager.Backup +{ + /** + * Set {@link FieldInfo} for lookup fields and also include fields. + * Calling this method is mandatory for correct functioning of backend loader. + * + * @param lookupFieldInfo List of {@link FieldInfo} that will be used as key in lookup. + * @param includeFieldInfo List of {@link FieldInfo} that will be retrieved from store. + */ + void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java new file mode 100644 index 0000000..71d3dce --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.esotericsoftware.kryo.NotNull; +import com.google.common.collect.Maps; +import com.datatorrent.lib.db.cache.CacheManager; +import com.datatorrent.lib.util.FieldInfo; + + +/** + * This implementation of {@link BackendLoader} loads the data from a given file into memory cache and serves queries + * from the cache. + * When this is set as primaryCache in {@link CacheManager}, CacheManager can call {@link #loadInitialData()} + * periodically to reload the file. + * <p> + * The format of the input file is: + * <p> + * {"productCategory": 5, "productId": 0} + * {"productCategory": 4, "productId": 1} + * {"productCategory": 5, "productId": 2} + * {"productCategory": 5, "productId": 3} + * </p> + * Each line in the input file should be a valid json object which represents a record and each key/value pair in that + * json object represents the fields/value. + * <p> + * NOTE: This loader should be used with caution as all the data present in the file is loaded in memory because of + * which the memory consumption may go up. + */ +@InterfaceStability.Evolving +public class FSLoader extends ReadOnlyBackup +{ + @NotNull + private String fileName; + + private transient Path filePath; + private transient FileSystem fs; + private transient boolean connected; + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>(){}); + private static final Logger logger = LoggerFactory.getLogger(FSLoader.class); + + public String getFileName() + { + return fileName; + } + + public void setFileName(String fileName) + { + this.fileName = fileName; + } + + @Override + public Map<Object, Object> loadInitialData() + { + Map<Object, Object> result = null; + FSDataInputStream in = null; + BufferedReader bin = null; + try { + result = Maps.newHashMap(); + in = fs.open(filePath); + bin = new BufferedReader(new InputStreamReader(in)); + String line; + while ((line = bin.readLine()) != null) { + try { + Map<String, Object> tuple = reader.readValue(line); + result.put(getKey(tuple), getValue(tuple)); + } catch (JsonProcessingException parseExp) { + logger.info("Unable to parse line {}", line); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (bin != null) { + IOUtils.closeQuietly(bin); + } + if (in != null) { + IOUtils.closeQuietly(in); + } + try { + fs.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + logger.debug("loading initial data {}", result.size()); + return result; + } + + private Object getValue(Map<String, Object> tuple) + { + ArrayList<Object> includeTuple = new ArrayList<Object>(); + for (FieldInfo s : includeFieldInfo) { + includeTuple.add(tuple.get(s.getColumnName())); + } + return includeTuple; + } + + private Object getKey(Map<String, Object> tuple) + { + ArrayList<Object> list = new ArrayList<Object>(); + for (FieldInfo key : lookupFieldInfo) { + list.add(tuple.get(key.getColumnName())); + } + return list; + } + + @Override + public Object get(Object key) + { + return null; + } + + @Override + public List<Object> getAll(List<Object> keys) + { + return null; + } + + @Override + public void connect() throws IOException + { + Configuration conf = new Configuration(); + this.filePath = new Path(fileName); + this.fs = FileSystem.newInstance(filePath.toUri(), conf); + if (!fs.isFile(filePath)) { + throw new IOException("Provided path " + fileName + " is not a file"); + } + connected = true; + } + + @Override + public void disconnect() throws IOException + { + if (fs != null) { + fs.close(); + } + } + + @Override + public boolean isConnected() + { + return connected; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java new file mode 100644 index 0000000..d20e87b --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/JDBCLoader.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.Lists; +import com.datatorrent.lib.db.jdbc.JdbcStore; +import com.datatorrent.lib.util.FieldInfo; + +/** + * <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and implements BackendLoaders interface.</p> <br/> + * <p> + * Properties:<br> + * <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br> + * <b>tableName</b>: JDBC table name<br> + * <br> + */ +@InterfaceStability.Evolving +public class JDBCLoader extends JdbcStore implements BackendLoader +{ + protected String queryStmt; + + protected String tableName; + + protected transient List<FieldInfo> includeFieldInfo; + protected transient List<FieldInfo> lookupFieldInfo; + + protected Object getQueryResult(Object key) + { + try { + PreparedStatement getStatement = getConnection().prepareStatement(queryStmt); + ArrayList<Object> keys = (ArrayList<Object>)key; + for (int i = 0; i < keys.size(); i++) { + getStatement.setObject(i + 1, keys.get(i)); + } + return getStatement.executeQuery(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + protected ArrayList<Object> getDataFrmResult(Object result) throws RuntimeException + { + try { + ResultSet resultSet = (ResultSet)result; + if (resultSet.next()) { + ResultSetMetaData rsdata = resultSet.getMetaData(); + // If the includefields is empty, populate it from ResultSetMetaData + if (CollectionUtils.isEmpty(includeFieldInfo)) { + if (includeFieldInfo == null) { + includeFieldInfo = new ArrayList<>(); + } + for (int i = 1; i <= rsdata.getColumnCount(); i++) { + String columnName = rsdata.getColumnName(i); + // TODO: Take care of type conversion. + includeFieldInfo.add(new FieldInfo(columnName, columnName, FieldInfo.SupportType.OBJECT)); + } + } + + ArrayList<Object> res = new ArrayList<Object>(); + for (FieldInfo f : includeFieldInfo) { + res.add(getConvertedData(resultSet.getObject(f.getColumnName()), f)); + } + return res; + } else { + return null; + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private Object getConvertedData(Object object, FieldInfo f) + { + if (f.getType().getJavaType() == object.getClass()) { + return object; + } else { + logger.warn("Type mismatch seen for field {}, returning as it is", f.getColumnName()); + return null; + } + } + + private String generateQueryStmt() + { + String stmt = "select * from " + tableName + " where "; + boolean first = true; + for (FieldInfo fieldInfo : lookupFieldInfo) { + if (first) { + first = false; + } else { + stmt += " and "; + } + stmt += fieldInfo.getColumnName() + " = ?"; + } + + logger.info("generateQueryStmt: {}", stmt); + return stmt; + } + + public String getQueryStmt() + { + return queryStmt; + } + + /** + * Set the sql Prepared Statement if the enrichment mechanism is query based. + */ + public void setQueryStmt(String queryStmt) + { + this.queryStmt = queryStmt; + } + + public String getTableName() + { + return tableName; + } + + /** + * Set the table name. + */ + public void setTableName(String tableName) + { + this.tableName = tableName; + } + + @Override + public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo) + { + this.lookupFieldInfo = lookupFieldInfo; + this.includeFieldInfo = includeFieldInfo; + if (queryStmt == null) { + queryStmt = generateQueryStmt(); + } + } + + @Override + public Map<Object, Object> loadInitialData() + { + return null; + } + + @Override + public Object get(Object key) + { + return getDataFrmResult(getQueryResult(key)); + } + + @Override + public List<Object> getAll(List<Object> keys) + { + List<Object> values = Lists.newArrayList(); + for (Object key : keys) { + values.add(get(key)); + } + return values; + } + + @Override + public void put(Object key, Object value) + { + throw new UnsupportedOperationException("Not supported operation"); + } + + @Override + public void putAll(Map<Object, Object> m) + { + throw new UnsupportedOperationException("Not supported operation"); + } + + @Override + public void remove(Object key) + { + throw new UnsupportedOperationException("Not supported operation"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java new file mode 100644 index 0000000..ecf16ba --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/MapEnricher.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.util.FieldInfo; + +/** + * This class takes a HashMap tuple as input and extracts value of the lookupKey configured + * for this operator. It perform a lookup using {@link com.datatorrent.lib.db.cache.CacheManager} to + * find a matching entry and adds the result to the original tuple. + * + * <p> + * Example: + * Lets say, input tuple is + * { amount=10.0, channelId=4, productId=3 } + * The tuple is modified as below: + * { amount=10.0, channelId=4, productId=3, <b>productCategory=1 </b>} + * </p> + * + * @displayName MapEnricher + * @category Database + * @tags enrichment, lookup, map + */ +@InterfaceStability.Evolving +public class MapEnricher extends AbstractEnricher<Map<String, Object>, Map<String, Object>> +{ + public final transient DefaultInputPort<Map<String, Object>> input = new DefaultInputPort<Map<String, Object>>() + { + @Override + public void process(Map<String, Object> obj) + { + processTuple(obj); + } + }; + + public final transient DefaultOutputPort<Map<String, Object>> output = new DefaultOutputPort<>(); + + protected void processTuple(Map<String, Object> obj) + { + enrichTuple(obj); + } + + @Override + protected Object getKey(Map<String, Object> tuple) + { + ArrayList<Object> keyList = new ArrayList<Object>(); + + for (FieldInfo fieldInfo : lookupFieldInfo) { + keyList.add(tuple.get(fieldInfo.getColumnName())); + } + + return keyList; + } + + @Override + protected Map<String, Object> convert(Map<String, Object> in, Object cached) + { + if (cached == null) { + return in; + } + + ArrayList<Object> newAttributes = (ArrayList<Object>)cached; + if (newAttributes != null) { + for (int i = 0; i < includeFieldInfo.size(); i++) { + in.put(includeFieldInfo.get(i).getColumnName(), newAttributes.get(i)); + } + } + return in; + } + + @Override + protected void emitEnrichedTuple(Map<String, Object> tuple) + { + output.emit(tuple); + } + + @Override + protected Class<?> getIncludeFieldType(String fieldName) + { + return Object.class; + } + + @Override + protected Class<?> getLookupFieldType(String fieldName) + { + return Object.class; + } + + /** + * Set fields on which lookup against which lookup will be performed. + * This is a mandatory parameter to set. + * + * @param lookupFields List of fields on which lookup happens. + * @description $[] Field which become part of lookup key + */ + @Override + public void setLookupFields(List<String> lookupFields) + { + super.setLookupFields(lookupFields); + } + + /** + * Set fields which will enrich the map. + * This is a mandatory parameter to set. + * + * @param includeFields List of fields. + * @description $[] Field which are fetched from store + */ + @Override + public void setIncludeFields(List<String> includeFields) + { + super.setIncludeFields(includeFields); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java new file mode 100644 index 0000000..2cf7326 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/NullValuesCacheManager.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.lib.db.cache.CacheManager; + +/** + * Null Values Cache Manager. Using this NULL entries can be specified explicitly. + */ +@InterfaceStability.Evolving +public class NullValuesCacheManager extends CacheManager +{ + + private static final NullObject NULL = new NullObject(); + + @Override + public Object get(Object key) + { + Object primaryVal = primary.get(key); + if (primaryVal != null) { + if (primaryVal == NULL) { + return null; + } + + return primaryVal; + } + + Object backupVal = backup.get(key); + if (backupVal != null) { + primary.put(key, backupVal); + } else { + primary.put(key, NULL); + } + + return backupVal; + } + + private static class NullObject + { + } +} + http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java new file mode 100644 index 0000000..782fbc5 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.PojoUtils; + + +/** + * This class takes a POJO as input and extracts the value of the lookupKey configured + * for this operator. It perform a lookup using {@link com.datatorrent.lib.db.cache.CacheManager} to + * find a matching entry and adds the result to the original tuple. + * + * <p> + * Properties:<br> + * <b>inputClass</b>: Class to be loaded for the incoming data type<br> + * <b>outputClass</b>: Class to be loaded for the emitted data type<br> + * <br> + * <p> + * + * <p> + * Example: + * Lets say, input tuple is + * { amount=10.0, channelId=4, productId=3 } + * The tuple is modified as below: + * { amount=10.0, channelId=4, productId=3, <b>productCategory=1 </b>} + * </p> + * + * @displayName POJOEnricher + * @category Database + * @tags enrichment, enricher, pojo, schema, lookup + */ +@InterfaceStability.Evolving +public class POJOEnricher extends AbstractEnricher<Object, Object> +{ + private static final Logger logger = LoggerFactory.getLogger(POJOEnricher.class); + + /** + * Helper fields + */ + protected Class<?> inputClass; + protected Class<?> outputClass; + private transient Map<PojoUtils.Getter, PojoUtils.Setter> fieldMap = new HashMap<>(); + private transient List<PojoUtils.Setter> includeSetters = new ArrayList<>(); + private transient List<PojoUtils.Getter> lookupGetters = new ArrayList<>(); + + /** + * AutoMetrics + */ + @AutoMetric + private int enrichedTupleCount; + @AutoMetric + private int errorTupleCount; + + @InputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() + { + @Override + public void setup(Context.PortContext context) + { + inputClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } + + @Override + public void process(Object object) + { + processTuple(object); + } + }; + + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>() + { + @Override + public void setup(Context.PortContext context) + { + outputClass = context.getValue(Context.PortContext.TUPLE_CLASS); + } + }; + + public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<>(); + + protected void processTuple(Object object) + { + enrichTuple(object); + } + + @Override + public void beginWindow(long windowId) + { + enrichedTupleCount = 0; + errorTupleCount = 0; + } + + @Override + protected Object getKey(Object tuple) + { + ArrayList<Object> keyList = new ArrayList<>(); + for (PojoUtils.Getter lookupGetter : lookupGetters) { + keyList.add(lookupGetter.get(tuple)); + } + return keyList; + } + + @Override + protected Object convert(Object in, Object cached) + { + Object o; + + try { + o = outputClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + logger.error("Failed to create new instance of output POJO", e); + errorTupleCount++; + error.emit(in); + return null; + } + + try { + for (Map.Entry<PojoUtils.Getter, PojoUtils.Setter> entry : fieldMap.entrySet()) { + entry.getValue().set(o, entry.getKey().get(in)); + } + } catch (RuntimeException e) { + logger.error("Failed to set the property. Continuing with default.", e); + errorTupleCount++; + error.emit(in); + return null; + } + + if (cached == null) { + return o; + } + + ArrayList<Object> includeObjects = (ArrayList<Object>)cached; + for (int i = 0; i < includeSetters.size(); i++) { + try { + includeSetters.get(i).set(o, includeObjects.get(i)); + } catch (RuntimeException e) { + logger.error("Failed to set the property. Continuing with default.", e); + errorTupleCount++; + error.emit(in); + return null; + } + } + + return o; + } + + @Override + protected void emitEnrichedTuple(Object tuple) + { + output.emit(tuple); + enrichedTupleCount++; + } + + @Override + protected Class<?> getIncludeFieldType(String fieldName) + { + try { + return outputClass.getDeclaredField(fieldName).getType(); + } catch (NoSuchFieldException e) { + logger.warn("Failed to find given fieldName, returning object type", e); + return Object.class; + } + } + + @Override + protected Class<?> getLookupFieldType(String fieldName) + { + try { + return inputClass.getDeclaredField(fieldName).getType(); + } catch (NoSuchFieldException e) { + logger.warn("Failed to find given fieldName, returning object type", e); + return Object.class; + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private PojoUtils.Setter generateSettersForField(Class<?> klass, String outputFieldName) + throws NoSuchFieldException, SecurityException + { + Field f = klass.getDeclaredField(outputFieldName); + Class c = ClassUtils.primitiveToWrapper(f.getType()); + return PojoUtils.createSetter(klass, outputFieldName, c); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private PojoUtils.Getter generateGettersForField(Class<?> klass, String inputFieldName) + throws NoSuchFieldException, SecurityException + { + Field f = klass.getDeclaredField(inputFieldName); + Class c = ClassUtils.primitiveToWrapper(f.getType()); + return PojoUtils.createGetter(klass, inputFieldName, c); + } + + @Override + public void activate(Context context) + { + super.activate(context); + + for (Field field : inputClass.getDeclaredFields()) { + try { + fieldMap.put(generateGettersForField(inputClass, field.getName()), + generateSettersForField(outputClass, field.getName())); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Unable to find field with name " + field.getName() + ", ignoring that field.", e); + } + } + + for (FieldInfo fieldInfo : this.includeFieldInfo) { + try { + includeSetters.add(generateSettersForField(outputClass, fieldInfo.getColumnName())); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Given field name is not present in output POJO", e); + } + } + + for (FieldInfo fieldInfo : this.lookupFieldInfo) { + try { + lookupGetters.add(generateGettersForField(inputClass, fieldInfo.getColumnName())); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Given lookup field is not present in POJO", e); + } + } + } + + /** + * Set fields on which lookup against which lookup will be performed. + * This is a mandatory parameter to set. + * + * @param lookupFields List of fields on which lookup happens. + * @description $[] Field which become part of lookup key + * @useSchema $[] input.fields[].name + */ + @Override + public void setLookupFields(List<String> lookupFields) + { + super.setLookupFields(lookupFields); + } + + /** + * Set fields which will enrich the POJO. + * This is a mandatory parameter to set. + * + * @param includeFields List of fields. + * @description $[] Field which are fetched from store + * @useSchema $[] output.fields[].name + */ + @Override + public void setIncludeFields(List<String> includeFields) + { + super.setIncludeFields(includeFields); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java new file mode 100644 index 0000000..157dbc9 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/ReadOnlyBackup.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.lib.util.FieldInfo; + +/** + * ReadOnly abstract implementation of BackendLoader. + */ +@InterfaceStability.Evolving +public abstract class ReadOnlyBackup implements BackendLoader +{ + protected transient List<FieldInfo> includeFieldInfo; + protected transient List<FieldInfo> lookupFieldInfo; + + @Override + public void put(Object key, Object value) + { + throw new UnsupportedOperationException("Not supported operation"); + } + + @Override + public void putAll(Map<Object, Object> m) + { + throw new UnsupportedOperationException("Not supported operation"); + } + + @Override + public void remove(Object key) + { + throw new UnsupportedOperationException("Not supported operation"); + } + + @Override + public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo) + { + this.includeFieldInfo = includeFieldInfo; + this.lookupFieldInfo = lookupFieldInfo; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java deleted file mode 100644 index cbd4d5e..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java +++ /dev/null @@ -1,170 +0,0 @@ -package com.datatorrent.contrib.enrichment; - -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.datatorrent.lib.db.cache.CacheManager; -import com.datatorrent.lib.db.cache.CacheStore; -import com.datatorrent.lib.db.cache.CacheStore.ExpiryType; -import com.esotericsoftware.kryo.NotNull; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Base class for Enrichment Operator. Subclasses should provide implementation to getKey and convert. - * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/> - * - * Properties:<br> - * <b>lookupFieldsStr</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br> - * <b>includeFieldsStr</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br> - * <b>store</b>: Specify the type of loader for looking data<br> - * <br> - * - * - * @displayName Abstract Enrichment Operator - * @tags Enrichment - * @param <INPUT> Type of tuples which are received by this operator</T> - * @param <OUTPUT> Type of tuples which are emitted by this operator</T> - * @since 2.1.0 - */ -public abstract class AbstractEnrichmentOperator<INPUT, OUTPUT> extends BaseOperator -{ - /** - * Keep lookup data cache for fast access. - */ - private transient CacheManager cacheManager; - - private transient CacheStore primaryCache = new CacheStore(); - - private int entryExpiryDurationInMillis = 24 * 60 * 60 * 1000; - private int cacheCleanupInMillis = 24 * 60 * 60 * 1000; - private int cacheSize = 1024; - - public transient DefaultOutputPort<OUTPUT> output = new DefaultOutputPort<OUTPUT>(); - - @InputPortFieldAnnotation(optional = true) - public transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>() - { - @Override public void process(INPUT tuple) - { - processTuple(tuple); - } - }; - - private EnrichmentBackup store; - - @NotNull - protected String lookupFieldsStr; - - protected String includeFieldsStr; - - protected transient List<String> lookupFields = new ArrayList<String>(); - protected transient List<String> includeFields = new ArrayList<String>(); - - protected void processTuple(INPUT tuple) { - Object key = getKey(tuple); - if(key != null) { - Object result = cacheManager.get(key); - OUTPUT out = convert(tuple, result); - emitTuple(out); - } - } - - protected abstract Object getKey(INPUT tuple); - - protected void emitTuple(OUTPUT tuple) { - output.emit(tuple); - } - - /* Add data from cached value to input field */ - protected abstract OUTPUT convert(INPUT in, Object cached); - - @Override public void setup(Context.OperatorContext context) - { - super.setup(context); - - cacheManager = new NullValuesCacheManager(); - - // set expiration to one day. - primaryCache.setEntryExpiryDurationInMillis(entryExpiryDurationInMillis); - primaryCache.setCacheCleanupInMillis(cacheCleanupInMillis); - primaryCache.setEntryExpiryStrategy(ExpiryType.EXPIRE_AFTER_WRITE); - primaryCache.setMaxCacheSize(cacheSize); - - lookupFields = Arrays.asList(lookupFieldsStr.split(",")); - if (includeFieldsStr != null) { - includeFields = Arrays.asList(includeFieldsStr.split(",")); - } - - try { - store.setFields(lookupFields, includeFields); - - cacheManager.setPrimary(primaryCache); - cacheManager.setBackup(store); - cacheManager.initialize(); - } catch (IOException e) { - throw new RuntimeException("Unable to initialize primary cache", e); - } - } - - /** - * Set the type of backup store for storing and searching data. - */ - public void setStore(EnrichmentBackup store) { - this.store = store; - } - - public EnrichmentBackup getStore() { - return store; - } - - public CacheStore getPrimaryCache() - { - return primaryCache; - } - - public String getLookupFieldsStr() - { - return lookupFieldsStr; - } - - /** - * Set the lookup fields for quick searching. It would be in comma separated list - */ - public void setLookupFieldsStr(String lookupFieldsStr) - { - this.lookupFieldsStr = lookupFieldsStr; - } - - public String getIncludeFieldsStr() - { - return includeFieldsStr; - } - - /** - * Set the list of comma separated fields to be added/replaced to the incoming tuple. - */ - public void setIncludeFieldsStr(String includeFieldsStr) - { - this.includeFieldsStr = includeFieldsStr; - } - - public void setEntryExpiryDurationInMillis(int entryExpiryDurationInMillis) - { - this.entryExpiryDurationInMillis = entryExpiryDurationInMillis; - } - - public void setCacheCleanupInMillis(int cacheCleanupInMillis) - { - this.cacheCleanupInMillis = cacheCleanupInMillis; - } - - public void setCacheSize(int cacheSize) - { - this.cacheSize = cacheSize; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java deleted file mode 100644 index 9155b13..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Copyright (c) 2015 DataTorrent, Inc. - * All rights reserved. - */ - -package com.datatorrent.contrib.enrichment; - -import com.datatorrent.lib.db.cache.CacheManager; -import java.util.List; -/** - * @since 3.1.0 - */ - -public interface EnrichmentBackup extends CacheManager.Backup -{ - public void setFields(List<String> lookupFields,List<String> includeFields); - public boolean needRefresh(); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java deleted file mode 100644 index 2effed0..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright (c) 2015 DataTorrent, Inc. - * All rights reserved. - */ -package com.datatorrent.contrib.enrichment; - -import com.esotericsoftware.kryo.NotNull; -import com.google.common.collect.Maps; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.codehaus.jackson.JsonProcessingException; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectReader; -import org.codehaus.jackson.type.TypeReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -/** - * @since 3.1.0 - */ - -public class FSLoader extends ReadOnlyBackup -{ - @NotNull - private String fileName; - - private transient Path filePath; - private transient FileSystem fs; - private transient boolean connected; - - private transient static final ObjectMapper mapper = new ObjectMapper(); - private transient static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>() - { - }); - private transient static final Logger logger = LoggerFactory.getLogger(FSLoader.class); - - public String getFileName() - { - return fileName; - } - - public void setFileName(String fileName) - { - this.fileName = fileName; - } - - @Override public Map<Object, Object> loadInitialData() - { - Map<Object, Object> result = null; - FSDataInputStream in = null; - BufferedReader bin = null; - try { - result = Maps.newHashMap(); - in = fs.open(filePath); - bin = new BufferedReader(new InputStreamReader(in)); - String line; - while ((line = bin.readLine()) != null) { - try { - Map<String, Object> tuple = reader.readValue(line); - if(CollectionUtils.isEmpty(includeFields)) { - if(includeFields == null) - includeFields = new ArrayList<String>(); - for (Map.Entry<String, Object> e : tuple.entrySet()) { - includeFields.add(e.getKey()); - } - } - ArrayList<Object> includeTuple = new ArrayList<Object>(); - for(String s: includeFields) { - includeTuple.add(tuple.get(s)); - } - result.put(getKey(tuple), includeTuple); - } catch (JsonProcessingException parseExp) { - logger.info("Unable to parse line {}", line); - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - if(bin != null) - IOUtils.closeQuietly(bin); - if(in != null) - IOUtils.closeQuietly(in); - try { - fs.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - logger.debug("loading initial data {}", result.size()); - return result; - } - - private Object getKey(Map<String, Object> tuple) - { - ArrayList<Object> lst = new ArrayList<Object>(); - for(String key : lookupFields) { - lst.add(tuple.get(key)); - } - return lst; - } - - @Override public Object get(Object key) - { - return null; - } - - @Override public List<Object> getAll(List<Object> keys) - { - return null; - } - - @Override public void connect() throws IOException - { - Configuration conf = new Configuration(); - this.filePath = new Path(fileName); - this.fs = FileSystem.newInstance(filePath.toUri(), conf); - if (!fs.isFile(filePath)) - throw new IOException("Provided path " + fileName + " is not a file"); - connected = true; - } - - @Override public void disconnect() throws IOException - { - if (fs != null) - fs.close(); - } - - @Override public boolean isConnected() - { - return connected; - } - - @Override - public boolean needRefresh() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java deleted file mode 100755 index 7040a7a..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java +++ /dev/null @@ -1,128 +0,0 @@ -package com.datatorrent.contrib.enrichment; - -import com.datatorrent.contrib.hbase.HBaseStore; -import com.google.common.collect.Lists; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import org.apache.commons.collections.CollectionUtils; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * <p>HBaseLoader extends from {@link HBaseStore} uses HBase to connect and implements EnrichmentBackup interface.</p> <br/> - * - * Properties:<br> - * <b>includeFamilys</b>: List of comma separated families and each family corresponds to the group name of column fields in includeFieldsStr. Ex: Family1,Family2<br> - * <br> - * - * @displayName HBaseLoader - * @tags Loader - * @since 2.1.0 - */ -public class HBaseLoader extends HBaseStore implements EnrichmentBackup -{ - protected transient List<String> includeFields; - protected transient List<String> lookupFields; - protected transient List<String> includeFamilys; - - protected Object getQueryResult(Object key) - { - try { - Get get = new Get(getRowBytes(((ArrayList)key).get(0))); - int idx = 0; - for(String f : includeFields) { - get.addColumn(Bytes.toBytes(includeFamilys.get(idx++)), Bytes.toBytes(f)); - } - return getTable().get(get); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - protected ArrayList<Object> getDataFrmResult(Object result) - { - Result res = (Result)result; - if (res == null || res.isEmpty()) - return null; - ArrayList<Object> columnInfo = new ArrayList<Object>(); - - if(CollectionUtils.isEmpty(includeFields)) { - if(includeFields == null) { - includeFields = new ArrayList<String>(); - includeFamilys.clear(); - includeFamilys = new ArrayList<String>(); - } - for (KeyValue kv: res.raw()) { - includeFields.add(new String(kv.getQualifier())); - includeFamilys.add(new String(kv.getFamily())); - } - } - for(KeyValue kv : res.raw()) { - columnInfo.add(kv.getValue()); - } - return columnInfo; - } - - private byte[] getRowBytes(Object key) - { - return ((String)key).getBytes(); - } - - @Override public void setFields(List<String> lookupFields,List<String> includeFields) - { - this.includeFields = includeFields; - this.lookupFields = lookupFields; - } - - /** - * Set the familyStr and would be in the form of comma separated list. - */ - public void setIncludeFamilyStr(String familyStr) - { - this.includeFamilys = Arrays.asList(familyStr.split(",")); - } - - @Override public boolean needRefresh() - { - return false; - } - - @Override public Map<Object, Object> loadInitialData() - { - return null; - } - - @Override public Object get(Object key) - { - return getDataFrmResult(getQueryResult(key)); - } - - @Override public List<Object> getAll(List<Object> keys) - { - List<Object> values = Lists.newArrayList(); - for (Object key : keys) { - values.add(get(key)); - } - return values; - } - - @Override public void put(Object key, Object value) - { - throw new RuntimeException("Not supported operation"); - } - - @Override public void putAll(Map<Object, Object> m) - { - throw new RuntimeException("Not supported operation"); - } - - @Override public void remove(Object key) - { - throw new RuntimeException("Not supported operation"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java deleted file mode 100644 index 3b1a8cf..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java +++ /dev/null @@ -1,158 +0,0 @@ -package com.datatorrent.contrib.enrichment; - -import com.datatorrent.lib.db.jdbc.JdbcStore; -import com.google.common.collect.Lists; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import org.apache.commons.collections.CollectionUtils; - -/** - * <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and implements EnrichmentBackup interface.</p> <br/> - * - * Properties:<br> - * <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br> - * <b>tableName</b>: JDBC table name<br> - * <br> - * - * @displayName JDBCLoader - * @tags Loader - * @since 2.1.0 - */ -public class JDBCLoader extends JdbcStore implements EnrichmentBackup -{ - protected String queryStmt; - - protected String tableName; - - protected transient List<String> includeFields; - protected transient List<String> lookupFields; - - protected Object getQueryResult(Object key) - { - try { - PreparedStatement getStatement = getConnection().prepareStatement(queryStmt); - ArrayList<Object> keys = (ArrayList<Object>) key; - for (int i = 0; i < keys.size(); i++) { - getStatement.setObject(i+1, keys.get(i)); - } - return getStatement.executeQuery(); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - protected ArrayList<Object> getDataFrmResult(Object result) throws RuntimeException - { - try { - ResultSet resultSet = (ResultSet) result; - if (resultSet.next()) { - ResultSetMetaData rsdata = resultSet.getMetaData(); - // If the includefields is empty, populate it from ResultSetMetaData - if(CollectionUtils.isEmpty(includeFields)) { - if(includeFields == null) - includeFields = new ArrayList<String>(); - for (int i = 1; i <= rsdata.getColumnCount(); i++) { - includeFields.add(rsdata.getColumnName(i)); - } - } - ArrayList<Object> res = new ArrayList<Object>(); - for(String f : includeFields) { - res.add(resultSet.getObject(f)); - } - return res; - } else - return null; - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - private String generateQueryStmt() - { - String stmt = "select * from " + tableName + " where "; - for (int i = 0; i < lookupFields.size(); i++) { - stmt = stmt + lookupFields.get(i) + " = ? "; - if(i != lookupFields.size() - 1) { - stmt = stmt + " and "; - } - } - logger.info("generateQueryStmt: {}", stmt); - return stmt; - } - - public String getQueryStmt() - { - return queryStmt; - } - - @Override - public boolean needRefresh() { - return false; - } - - /** - * Set the sql Prepared Statement if the enrichment mechanism is query based. - */ - public void setQueryStmt(String queryStmt) - { - this.queryStmt = queryStmt; - } - - public String getTableName() - { - return tableName; - } - /** - * Set the table name. - */ - public void setTableName(String tableName) - { - this.tableName = tableName; - } - - @Override public void setFields(List<String> lookupFields,List<String> includeFields) - { - this.includeFields = includeFields; - this.lookupFields = lookupFields; - if(queryStmt == null) - queryStmt = generateQueryStmt(); - } - @Override public Map<Object, Object> loadInitialData() - { - return null; - } - - @Override public Object get(Object key) - { - return getDataFrmResult(getQueryResult(key)); - } - - @Override public List<Object> getAll(List<Object> keys) - { - List<Object> values = Lists.newArrayList(); - for (Object key : keys) { - values.add(get(key)); - } - return values; - } - - @Override public void put(Object key, Object value) - { - throw new RuntimeException("Not supported operation"); - } - - @Override public void putAll(Map<Object, Object> m) - { - throw new RuntimeException("Not supported operation"); - } - - @Override public void remove(Object key) - { - throw new RuntimeException("Not supported operation"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java deleted file mode 100644 index 040b5ae..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.datatorrent.contrib.enrichment; - -import java.util.ArrayList; -import java.util.Map; - -/** - * - * This class takes a HashMap tuple as input and extract the value of the lookupKey configured - * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs - * specified in the file/DB or based on include fields are added to original tuple. - * - * Example - * The file contains data in json format, one entry per line. during setup entire file is read and - * kept in memory for quick lookup. - * If file contains following lines, and operator is configured with lookup key "productId" - * { "productId": 1, "productCategory": 3 } - * { "productId": 4, "productCategory": 10 } - * { "productId": 3, "productCategory": 1 } - * - * And input tuple is - * { amount=10.0, channelId=4, productId=3 } - * - * The tuple is modified as below before operator emits it on output port. - * { amount=10.0, channelId=4, productId=3, productCategory=1 } - * - * - * @displayName MapEnrichment - * @category Database - * @tags enrichment, lookup - * - * @since 2.1.0 - */ -public class MapEnrichmentOperator extends AbstractEnrichmentOperator<Map<String, Object>, Map<String, Object>> -{ - @Override protected Object getKey(Map<String, Object> tuple) - { - ArrayList<Object> keyList = new ArrayList<Object>(); - for(String key : lookupFields) { - keyList.add(tuple.get(key)); - } - return keyList; - } - - @Override protected Map<String, Object> convert(Map<String, Object> in, Object cached) - { - if (cached == null) - return in; - - ArrayList<Object> newAttributes = (ArrayList<Object>)cached; - if(newAttributes != null) { - for (int i = 0; i < includeFields.size(); i++) { - in.put(includeFields.get(i), newAttributes.get(i)); - } - } - return in; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java deleted file mode 100644 index f668683..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (c) 2015 DataTorrent, Inc. - * All rights reserved. - */ - -package com.datatorrent.contrib.enrichment; - -import com.datatorrent.lib.db.cache.CacheManager; - -/** - * @since 3.1.0 - */ -public class NullValuesCacheManager extends CacheManager -{ - - private static final NullObject NULL = new NullObject(); - @Override - public Object get(Object key) - { - Object primaryVal = primary.get(key); - if (primaryVal != null) { - if (primaryVal == NULL) { - return null; - } - - return primaryVal; - } - - Object backupVal = backup.get(key); - if (backupVal != null) { - primary.put(key, backupVal); - } else { - primary.put(key, NULL); - } - return backupVal; - - } - - private static class NullObject - { - } -} - http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java deleted file mode 100644 index e707198..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java +++ /dev/null @@ -1,185 +0,0 @@ -package com.datatorrent.contrib.enrichment; - -import com.datatorrent.api.Context; -import com.datatorrent.lib.util.PojoUtils; -import com.datatorrent.lib.util.PojoUtils.Getter; -import com.datatorrent.lib.util.PojoUtils.Setter; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -import com.esotericsoftware.kryo.NotNull; -import org.apache.commons.lang3.ClassUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * This class takes a POJO as input and extract the value of the lookupKey configured - * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs - * specified in the file/DB or based on include fieldMap are added to original tuple. - * - * Properties:<br> - * <b>inputClass</b>: Class to be loaded for the incoming data type<br> - * <b>outputClass</b>: Class to be loaded for the emitted data type<br> - * <br> - * - * Example - * The file contains data in json format, one entry per line. during setup entire file is read and - * kept in memory for quick lookup. - * If file contains following lines, and operator is configured with lookup key "productId" - * { "productId": 1, "productCategory": 3 } - * { "productId": 4, "productCategory": 10 } - * { "productId": 3, "productCategory": 1 } - * - * And input tuple is - * { amount=10.0, channelId=4, productId=3 } - * - * The tuple is modified as below before operator emits it on output port. - * { amount=10.0, channelId=4, productId=3, productCategory=1 } - * - * @displayName BeanEnrichment - * @category Database - * @tags enrichment, lookup - * - * @since 2.1.0 - */ -public class POJOEnrichmentOperator extends AbstractEnrichmentOperator<Object, Object> { - - private transient static final Logger logger = LoggerFactory.getLogger(POJOEnrichmentOperator.class); - protected Class inputClass; - protected Class outputClass; - private transient List<Getter> getters = new LinkedList<Getter>(); - private transient List<FieldObjectMap> fieldMap = new LinkedList<FieldObjectMap>(); - private transient List<Setter> updateSetter = new LinkedList<Setter>(); - - @NotNull - protected String outputClassStr; - - - @Override - protected Object getKey(Object tuple) { - ArrayList<Object> keyList = new ArrayList<Object>(); - for(Getter g : getters) { - keyList.add(g.get(tuple)); - } - return keyList; - } - - @Override - protected Object convert(Object in, Object cached) { - try { - Object o = outputClass.newInstance(); - - // Copy the fields from input to output - for (FieldObjectMap map : fieldMap) { - map.set.set(o, map.get.get(in)); - } - - if (cached == null) - return o; - - if(updateSetter.size() == 0 && includeFields.size() != 0) { - populateUpdatesFrmIncludeFields(); - } - ArrayList<Object> newAttributes = (ArrayList<Object>)cached; - int idx = 0; - for(Setter s: updateSetter) { - s.set(o, newAttributes.get(idx)); - idx++; - } - return o; - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } - - @Override - public void setup(Context.OperatorContext context) { - super.setup(context); - populateUpdatesFrmIncludeFields(); - } - - private void populateGettersFrmLookup() - { - for (String fName : lookupFields) { - Getter f = PojoUtils.createGetter(inputClass, fName, Object.class); - getters.add(f); - } - } - - private void populateGettersFrmInput() - { - Field[] fields = inputClass.getFields(); - for (Field f : fields) { - Class c = ClassUtils.primitiveToWrapper(f.getType()); - FieldObjectMap fieldMap = new FieldObjectMap(); - fieldMap.get = PojoUtils.createGetter(inputClass, f.getName(), c); - try { - fieldMap.set = PojoUtils.createSetter(outputClass, f.getName(), c); - } catch (Throwable e) { - throw new RuntimeException("Failed to initialize Output Class for field: " + f.getName(), e); - } - this.fieldMap.add(fieldMap); - } - } - - private void populateUpdatesFrmIncludeFields() { - if (this.outputClass == null) { - logger.debug("Creating output class instance from string: {}", outputClassStr); - try { - this.outputClass = this.getClass().getClassLoader().loadClass(outputClassStr); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } - } - - for (String fName : includeFields) { - try { - Field f = outputClass.getField(fName); - Class c; - if(f.getType().isPrimitive()) { - c = ClassUtils.primitiveToWrapper(f.getType()); - } else { - c = f.getType(); - } - try { - updateSetter.add(PojoUtils.createSetter(outputClass, f.getName(), c)); - } catch (Throwable e) { - throw new RuntimeException("Failed to initialize Output Class for field: " + f.getName(), e); - } - } catch (NoSuchFieldException e) { - throw new RuntimeException("Cannot find field '" + fName + "' in output class", e); - } - } - } - - public String getOutputClassStr() - { - return outputClassStr; - } - - public void setOutputClassStr(String outputClassStr) - { - this.outputClassStr = outputClassStr; - } - - @Override protected void processTuple(Object tuple) - { - if (inputClass == null) { - inputClass = tuple.getClass(); - populateGettersFrmLookup(); - populateGettersFrmInput(); - } - super.processTuple(tuple); - } - - private class FieldObjectMap - { - public Getter get; - public Setter set; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java deleted file mode 100644 index 3357704..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (c) 2015 DataTorrent, Inc. - * All rights reserved. - */ -package com.datatorrent.contrib.enrichment; - -import java.util.List; -import java.util.Map; -/** - * @since 3.1.0 - */ - -public abstract class ReadOnlyBackup implements EnrichmentBackup -{ - protected transient List<String> includeFields; - protected transient List<String> lookupFields; - - @Override public void put(Object key, Object value) - { - throw new RuntimeException("Not supported operation"); - } - - @Override public void putAll(Map<Object, Object> m) - { - throw new RuntimeException("Not supported operation"); - } - - @Override public void remove(Object key) - { - throw new RuntimeException("Not supported operation"); - } - - @Override public void setFields(List<String> lookupFields,List<String> includeFields) - { - this.includeFields = includeFields; - this.lookupFields = lookupFields; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java deleted file mode 100644 index 7d5b4cd..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java +++ /dev/null @@ -1 +0,0 @@ -package com.datatorrent.contrib.enrichment; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java new file mode 100644 index 0000000..6015435 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/EmployeeOrder.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +public class EmployeeOrder +{ + public int OID; + public int ID; + public double amount; + public String NAME; + public int AGE; + public String ADDRESS; + public double SALARY; + + public int getOID() + { + return OID; + } + + public void setOID(int OID) + { + this.OID = OID; + } + + public int getID() + { + return ID; + } + + public void setID(int ID) + { + this.ID = ID; + } + + public int getAGE() + { + return AGE; + } + + public void setAGE(int AGE) + { + this.AGE = AGE; + } + + public String getNAME() + { + return NAME; + } + + public void setNAME(String NAME) + { + this.NAME = NAME; + } + + public double getAmount() + { + return amount; + } + + public void setAmount(double amount) + { + this.amount = amount; + } + + public String getADDRESS() + { + return ADDRESS; + } + + public void setADDRESS(String ADDRESS) + { + this.ADDRESS = ADDRESS; + } + + public double getSALARY() + { + return SALARY; + } + + public void setSALARY(double SALARY) + { + this.SALARY = SALARY; + } + + @Override + public String toString() + { + return "{" + + "OID=" + OID + + ", ID=" + ID + + ", amount=" + amount + + ", NAME='" + NAME + '\'' + + ", AGE=" + AGE + + ", ADDRESS='" + ADDRESS.trim() + '\'' + + ", SALARY=" + SALARY + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java new file mode 100644 index 0000000..f24a13c --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.Arrays; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; + +import com.esotericsoftware.kryo.Kryo; +import com.google.common.collect.Maps; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class FileEnrichmentTest +{ + + @Rule + public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo(); + + @Test + public void testEnrichmentOperator() throws IOException, InterruptedException + { + URL origUrl = this.getClass().getResource("/productmapping.txt"); + + URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping1.txt"); + FileUtils.deleteQuietly(new File(fileUrl.getPath())); + FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath())); + + MapEnricher oper = new MapEnricher(); + FSLoader store = new FSLoader(); + store.setFileName(fileUrl.toString()); + oper.setLookupFields(Arrays.asList("productId")); + oper.setIncludeFields(Arrays.asList("productCategory")); + oper.setStore(store); + + oper.setup(null); + + /* File contains 6 entries, but operator one entry is duplicate, + * so cache should contains only 5 entries after scanning input file. + */ + //Assert.assertEquals("Number of mappings ", 7, oper.cache.size()); + + CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<>(); + @SuppressWarnings({ "unchecked", "rawtypes" }) + CollectorTestSink<Object> tmp = (CollectorTestSink)sink; + oper.output.setSink(tmp); + + oper.activate(null); + + oper.beginWindow(0); + Map<String, Object> tuple = Maps.newHashMap(); + tuple.put("productId", 3); + tuple.put("channelId", 4); + tuple.put("amount", 10.0); + + Kryo kryo = new Kryo(); + oper.input.process(kryo.copy(tuple)); + + oper.endWindow(); + + oper.deactivate(); + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size()); + Map<String, Object> emitted = sink.collectedTuples.iterator().next(); + + /* The fields present in original event is kept as it is */ + Assert.assertEquals("Number of fields in emitted tuple", 4, emitted.size()); + Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId")); + Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId")); + Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount")); + + /* Check if productCategory is added to the event */ + Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory")); + Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory")); + Assert.assertTrue(emitted.get("productCategory") instanceof Integer); + } +} +