This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e32b4e9 [GOBBLIN-1198] status cleaner
e32b4e9 is described below
commit e32b4e9653da1ef329c562d3c848ccbd8981da5a
Author: Arjun <[email protected]>
AuthorDate: Thu Jun 18 10:50:34 2020 -0700
[GOBBLIN-1198] status cleaner
Closes #3044 from arjun4084346/cleanJobStatus
---
.../main/java/org/apache/gobblin/configuration/State.java | 2 --
.../gobblin/metastore/util/StateStoreCleanerRunnable.java | 1 +
.../gobblin/service/monitoring/KafkaJobStatusMonitor.java | 13 +++++++++++--
3 files changed, 12 insertions(+), 4 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
index 63ced0a..eab5665 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
@@ -20,9 +20,7 @@ package org.apache.gobblin.configuration;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.Set;
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
index cbc7f2e..b410622 100644
---
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.util.ConfigUtils;
* A utility class that wraps the {@link StateStoreCleaner} implementation as
a {@link Runnable}.
*/
@Slf4j
+@Deprecated
public class StateStoreCleanerRunnable implements Runnable {
private Properties properties;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 30766d5..ed879df 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -39,15 +39,18 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.retention.DatasetCleanerTask;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
@@ -99,7 +102,13 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
protected void startUp() {
super.startUp();
log.info("Scheduling state store cleaner..");
- scheduledExecutorService.scheduleAtFixedRate(new
StateStoreCleanerRunnable(this.config), 300, 86400L, TimeUnit.SECONDS);
+ org.apache.gobblin.configuration.State state = new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(this.config));
+ state.setProp(ConfigurationKeys.JOB_ID_KEY,
"GobblinServiceJobStatusCleanerJob");
+ state.setProp(ConfigurationKeys.TASK_ID_KEY,
"GobblinServiceJobStatusCleanerTask");
+
+ TaskContext taskContext = new TaskContext(new
WorkUnitState(WorkUnit.createEmpty(), state));
+ DatasetCleanerTask cleanerTask = new DatasetCleanerTask(taskContext);
+ scheduledExecutorService.scheduleAtFixedRate(cleanerTask, 300L, 86400L,
TimeUnit.SECONDS);
}
@Override