This is an automated email from the ASF dual-hosted git repository.
priyag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push:
new 712027a APEXMALHAR-2535: change type of timeout variables from int to
long Fix NPE during teardown, when refreshtime is not set Allow setting
expiration type
712027a is described below
commit 712027aec6388cb2af7709c63c6c59fa82a94307
Author: Tushar R. Gosavi <[email protected]>
AuthorDate: Tue Aug 8 12:33:41 2017 +0530
APEXMALHAR-2535: change type of timeout variables from int to long
Fix NPE during teardown, when refreshtime is not set
Allow setting expiration type
---
.../contrib/enrich/AbstractEnricher.java | 27 ++++++++++++++++------
.../com/datatorrent/lib/db/cache/CacheStore.java | 8 +++----
2 files changed, 24 insertions(+), 11 deletions(-)
diff --git
a/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
index de9e2c7..c377b96 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/AbstractEnricher.java
@@ -25,11 +25,13 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.NotNull;
+
import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.db.cache.CacheManager;
import com.datatorrent.lib.db.cache.CacheStore;
+import com.datatorrent.lib.db.cache.CacheStore.ExpiryType;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.FieldInfo.SupportType;
@@ -66,8 +68,9 @@ public abstract class AbstractEnricher<INPUT, OUTPUT> extends
BaseOperator imple
/**
* Optional parameters for enricher.
*/
- private int cacheExpirationInterval = 1 * 60 * 60 * 1000; // 1 hour
- private int cacheCleanupInterval = 1 * 60 * 60 * 1000; // 1 hour
+ private long cacheExpirationInterval = 1 * 60 * 60 * 1000; // 1 hour
+ private long cacheCleanupInterval = 1 * 60 * 60 * 1000; // 1 hour
+ private ExpiryType expiryType = ExpiryType.EXPIRE_AFTER_WRITE;
private int cacheSize = 1024; // 1024 records
/**
@@ -160,7 +163,7 @@ public abstract class AbstractEnricher<INPUT, OUTPUT>
extends BaseOperator imple
// set expiration to one day.
primaryCache.setEntryExpiryDurationInMillis(cacheExpirationInterval);
primaryCache.setCacheCleanupInMillis(cacheCleanupInterval);
-
primaryCache.setEntryExpiryStrategy(CacheStore.ExpiryType.EXPIRE_AFTER_WRITE);
+ primaryCache.setEntryExpiryStrategy(expiryType);
primaryCache.setMaxCacheSize(cacheSize);
cacheManager.setPrimary(primaryCache);
@@ -268,7 +271,7 @@ public abstract class AbstractEnricher<INPUT, OUTPUT>
extends BaseOperator imple
*
* @return Cache entry expiration interval in ms
*/
- public int getCacheExpirationInterval()
+ public long getCacheExpirationInterval()
{
return cacheExpirationInterval;
}
@@ -279,7 +282,7 @@ public abstract class AbstractEnricher<INPUT, OUTPUT>
extends BaseOperator imple
*
* @param cacheExpirationInterval Cache entry expiration interval in ms
*/
- public void setCacheExpirationInterval(int cacheExpirationInterval)
+ public void setCacheExpirationInterval(long cacheExpirationInterval)
{
this.cacheExpirationInterval = cacheExpirationInterval;
}
@@ -290,7 +293,7 @@ public abstract class AbstractEnricher<INPUT, OUTPUT>
extends BaseOperator imple
*
* @return cache cleanup interval in ms.
*/
- public int getCacheCleanupInterval()
+ public long getCacheCleanupInterval()
{
return cacheCleanupInterval;
}
@@ -301,7 +304,7 @@ public abstract class AbstractEnricher<INPUT, OUTPUT>
extends BaseOperator imple
*
* @param cacheCleanupInterval cache cleanup interval in ms.
*/
- public void setCacheCleanupInterval(int cacheCleanupInterval)
+ public void setCacheCleanupInterval(long cacheCleanupInterval)
{
this.cacheCleanupInterval = cacheCleanupInterval;
}
@@ -326,6 +329,16 @@ public abstract class AbstractEnricher<INPUT, OUTPUT>
extends BaseOperator imple
this.cacheSize = cacheSize;
}
+ public ExpiryType getExpiryType()
+ {
+ return expiryType;
+ }
+
+ public void setExpiryType(ExpiryType expiryType)
+ {
+ this.expiryType = expiryType;
+ }
+
public CacheManager getCacheManager()
{
return cacheManager;
diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
index 1b43085..c073aff 100644
--- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java
@@ -58,10 +58,10 @@ public class CacheStore implements CacheManager.Primary,
Component<CacheManager.
protected long maxCacheSize = 2000;
@Min(0)
- protected int entryExpiryDurationInMillis = 60000; //1 minute
+ protected long entryExpiryDurationInMillis = 60000; //1 minute
@Min(0)
- protected int cacheCleanupIntervalInMillis = 60500; //.5 seconds after
entries are expired
+ protected long cacheCleanupIntervalInMillis = 60500; //.5 seconds after
entries are expired
@NotNull
protected ExpiryType entryExpiryStrategy = ExpiryType.EXPIRE_AFTER_ACCESS;
@@ -190,7 +190,7 @@ public class CacheStore implements CacheManager.Primary,
Component<CacheManager.
*
* @param durationInMillis the duration after which a cache entry is expired.
*/
- public void setEntryExpiryDurationInMillis(int durationInMillis)
+ public void setEntryExpiryDurationInMillis(long durationInMillis)
{
this.entryExpiryDurationInMillis = durationInMillis;
}
@@ -200,7 +200,7 @@ public class CacheStore implements CacheManager.Primary,
Component<CacheManager.
*
* @param durationInMillis the duration after which cache is cleaned up
regularly.
*/
- public void setCacheCleanupInMillis(int durationInMillis)
+ public void setCacheCleanupInMillis(long durationInMillis)
{
this.cacheCleanupIntervalInMillis = durationInMillis;
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].