[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15247551#comment-15247551
 ] 

ASF GitHub Bot commented on APEXMALHAR-2023:
--------------------------------------------

Github user sandeepdeshmukh commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r60210490
  
    --- Diff: 
contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java ---
    @@ -0,0 +1,319 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.hadoop.classification.InterfaceStability;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.lib.db.cache.CacheManager;
    +import com.datatorrent.lib.db.cache.CacheStore;
    +import com.datatorrent.lib.util.FieldInfo;
    +import com.datatorrent.lib.util.FieldInfo.SupportType;
    +
    +/**
    + * Base class for Enrichment Operator.  Subclasses should provide 
implementation to getKey and convert.
    + * The operator receives a tuple and emits enriched tuple based on 
includeFields and lookupFields. <br/>
    + * <p>
    + * Properties:<br>
    + * <b>lookupFields</b>: List of comma separated keys for quick searching. 
Ex: Field1,Field2,Field3<br>
    + * <b>includeFields</b>: List of comma separated fields to be 
replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
    + * <b>store</b>: Specify the type of loader for looking data<br>
    + * <br>
    + *
    + * @param <INPUT>  Type of tuples which are received by this operator</T>
    + * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
    + * @displayName Abstract Enrichment Operator
    + * @tags Enrichment
    + */
    [email protected]
    +public abstract class AbstractEnricher<INPUT, OUTPUT> extends BaseOperator 
implements Operator.ActivationListener
    +{
    +  /**
    +   * Mandatory parameters for Enricher
    +   */
    +  @NotNull
    +  protected List<String> lookupFields;
    +  @NotNull
    +  protected List<String> includeFields;
    +  @NotNull
    +  private BackendLoader store;
    +
    +  /**
    +   * Optional parameters for enricher.
    +   */
    +  private int cacheExpirationInterval = 24 * 60 * 60 * 1000;
    +  private int cacheCleanupInterval = 24 * 60 * 60 * 1000;
    +  private int cacheSize = 1024;
    +
    +  /**
    +   * Helper variables.
    +   */
    +  private transient CacheManager cacheManager;
    +  protected transient List<FieldInfo> lookupFieldInfo = new ArrayList<>();
    +  protected transient List<FieldInfo> includeFieldInfo = new ArrayList<>();
    +
    +  /**
    +   * This method needs to be called by implementing class for processing a 
tuple for enrichment.
    +   * The method will take th tuple through following stages:
    +   * <ol>
    +   * <li>Call {@link #getKey(Object)} to retrieve key fields for 
lookup</li>
    +   * <li>Using key fields call cache manager to retrieve for any key that 
is cached already</li>
    +   * <li>If not found in cache, it'll do a lookup in configured backend 
store</li>
    +   * <li>The retrieved fields will be passed to {@link #convert(Object, 
Object)} method to create the final object</li>
    +   * <li>Finally {@link #emitTuple(Object)} is called for emitting the 
tuple</li>
    +   * </ol>
    +   *
    +   * @param tuple Input tuple that needs to get processed for enrichment.
    +   */
    +  protected void enrichTuple(INPUT tuple)
    +  {
    +    Object key = getKey(tuple);
    +    if (key != null) {
    +      Object result = cacheManager.get(key);
    +      OUTPUT out = convert(tuple, result);
    +      emitTuple(out);
    +    }
    +  }
    +
    +  /**
    +   * The method should be implemented by concrete class which returns an 
ArrayList<Object> containing all the fields
    +   * which forms key part of lookup.
    +   * The order of field values should be same as the one set in {@link 
#lookupFields} variable.
    +   *
    +   * @param tuple Input tuple from which fields values for key needs to be 
fetched.
    +   * @return Should return ArrayList<Object> which has fields values 
forming keys in same order as {@link #lookupFields}
    +   */
    +  protected abstract Object getKey(INPUT tuple);
    +
    +  /**
    +   * The method should be implemented by concrete class.
    +   * This method is expected to take input tuple and a externally fetched 
object contained field to be enriched, and
    +   * return a Enriched tuple which is ready to be emitted.
    +   *
    +   * @param in     Input tuple which needs to be enriched.
    +   * @param cached ArrayList<Object> containing missing data retrieved 
from external sources.
    +   * @return Enriched tuple of type OUTPUT
    +   */
    +  protected abstract OUTPUT convert(INPUT in, Object cached);
    +
    +  /**
    +   * This method should be implemented by concrete class.
    +   * The method is expected to emit tuple of type OUTPUT
    +   *
    +   * @param tuple Tuple of type OUTPUT that should be emitted.
    +   */
    +  protected abstract void emitTuple(OUTPUT tuple);
    +
    +  /**
    +   * This method should be implemented by concrete method.
    +   * The method should return Class type of field for given fieldName from 
output tuple.
    +   *
    +   * @param fieldName Field name for which field type needs to be 
identified
    +   * @return Class type for given field.
    +   */
    +  protected abstract Class<?> getIncludeFieldType(String fieldName);
    +
    +  /**
    +   * This method should be implemented by concrete method.
    +   * The method should return Class type of field for given fieldName from 
input tuple.
    +   *
    +   * @param fieldName Field name for which field type needs to be 
identified
    +   * @return Class type for given field.
    +   */
    +  protected abstract Class<?> getLookupFieldType(String fieldName);
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    super.setup(context);
    +
    +    cacheManager = new NullValuesCacheManager();
    +    CacheStore primaryCache = new CacheStore();
    +
    +    // set expiration to one day.
    +    primaryCache.setEntryExpiryDurationInMillis(cacheExpirationInterval);
    +    primaryCache.setCacheCleanupInMillis(cacheCleanupInterval);
    +    
primaryCache.setEntryExpiryStrategy(CacheStore.ExpiryType.EXPIRE_AFTER_WRITE);
    +    primaryCache.setMaxCacheSize(cacheSize);
    +
    +    cacheManager.setPrimary(primaryCache);
    +    cacheManager.setBackup(store);
    +  }
    +
    +  @Override
    +  public void activate(Context context)
    +  {
    +    for (String s : lookupFields) {
    +      lookupFieldInfo.add(new FieldInfo(s, s, 
SupportType.getFromJavaType(getLookupFieldType(s))));
    +    }
    +
    +    if (includeFields != null) {
    +      for (String s : includeFields) {
    +        includeFieldInfo.add(new FieldInfo(s, s, 
SupportType.getFromJavaType(getIncludeFieldType(s))));
    +      }
    +    }
    +
    +    store.setFieldInfo(lookupFieldInfo, includeFieldInfo);
    +
    +    try {
    +      cacheManager.initialize();
    +    } catch (IOException e) {
    +      throw new RuntimeException("Unable to initialize primary cache", e);
    +    }
    +  }
    +
    +  @Override
    +  public void deactivate()
    +  {
    --- End diff --
    
    Close any resources, like db connection for the cacheManager


> Adding Enrichment Operator to Malhar
> ------------------------------------
>
>                 Key: APEXMALHAR-2023
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2023
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>          Components: adapters database
>    Affects Versions: 3.3.1
>            Reporter: Chinmay Kolhatkar
>            Assignee: Chinmay Kolhatkar
>
> Add Enrichment Operator to Apex Malhar.
> Discussion is happening in mailing list here:
> http://mail-archives.apache.org/mod_mbox/incubator-apex-dev/201603.mbox/%3CCAKJfLDMo24-Gcvum2ZL8-0JOnE8QLryAy0Zu_R5zhMd_bsJyHw%40mail.gmail.com%3E
> Ponymail permalink:
> https://pony-poc.apache.org/thread.html/Z8t5ut5pu5vprgt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to