- add method to clear workflow instances from workflow inst repo - add unit tests
Project: http://git-wip-us.apache.org/repos/asf/oodt/repo Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/d49d72a7 Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/d49d72a7 Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/d49d72a7 Branch: refs/heads/feature/zookeeper-config Commit: d49d72a7837b55b22a74305d99dce5644fb8a860 Parents: e1b3063 Author: Chris Mattmann <[email protected]> Authored: Mon Jul 24 23:13:00 2017 -0700 Committer: Chris Mattmann <[email protected]> Committed: Mon Jul 24 23:13:00 2017 -0700 ---------------------------------------------------------------------- .../DataSourceWorkflowInstanceRepository.java | 54 ++++++++++++++++++++ .../LuceneWorkflowInstanceRepository.java | 37 ++++++++++++++ .../MemoryWorkflowInstanceRepository.java | 6 +++ .../instrepo/WorkflowInstanceRepository.java | 8 +++ .../workflow/system/XmlRpcWorkflowManager.java | 8 +++ .../system/XmlRpcWorkflowManagerClient.java | 6 +++ .../TestLuceneWorkflowInstanceRepository.java | 19 +++++++ 7 files changed, 138 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oodt/blob/d49d72a7/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/DataSourceWorkflowInstanceRepository.java ---------------------------------------------------------------------- diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/DataSourceWorkflowInstanceRepository.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/DataSourceWorkflowInstanceRepository.java index 05688bd..2fe1b15 100644 --- a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/DataSourceWorkflowInstanceRepository.java +++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/DataSourceWorkflowInstanceRepository.java @@ -174,6 +174,60 @@ public class DataSourceWorkflowInstanceRepository extends } } + + + @Override + public synchronized boolean clearWorkflowInstances() throws InstanceRepositoryException { + Connection conn = null; + Statement statement = null; + + try { + conn = dataSource.getConnection(); + conn.setAutoCommit(false); + statement = conn.createStatement(); + + String deleteSql = "DELETE FROM workflow_instances"; + + LOG.log(Level.FINE, "deleteSql: Executing: " + + deleteSql); + statement.execute(deleteSql); + conn.commit(); + + } catch (Exception e) { + LOG.log(Level.SEVERE, e.getMessage()); + LOG.log(Level.WARNING, + "Exception deleting all workflow instances. Message: " + + e.getMessage()); + try { + if (conn != null) { + conn.rollback(); + } + } catch (SQLException e2) { + LOG.log(Level.SEVERE, + "Unable to rollback delete workflow instances " + + "transaction. Message: " + e2.getMessage()); + } + throw new InstanceRepositoryException(e.getMessage()); + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException ignore) { + } + + } + + if (conn != null) { + try { + conn.close(); + + } catch (SQLException ignore) { + } + + } + } + return true; + } /* * (non-Javadoc) http://git-wip-us.apache.org/repos/asf/oodt/blob/d49d72a7/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java ---------------------------------------------------------------------- diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java index c3bf291..af0975a 100644 --- a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java +++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/LuceneWorkflowInstanceRepository.java @@ -24,6 +24,7 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.SortedDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.index.*; +import org.apache.lucene.queryParser.QueryParser; import org.apache.lucene.search.*; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; @@ -308,6 +309,42 @@ public class LuceneWorkflowInstanceRepository extends return wInsts; } + + @Override + public synchronized boolean clearWorkflowInstances() throws InstanceRepositoryException { + IndexWriter writer = null; + try { + IndexWriterConfig config = new IndexWriterConfig(new StandardAnalyzer()); + config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND); + LogMergePolicy lmp =new LogDocMergePolicy(); + lmp.setMergeFactor(mergeFactor); + config.setMergePolicy(lmp); + + writer = new IndexWriter(indexDir, config); + LOG.log(Level.FINE, + "LuceneWorkflowEngine: remove all workflow instances"); + writer.deleteDocuments(new Term("myfield", "myvalue")); + } catch (IOException e) { + LOG.log(Level.SEVERE, e.getMessage()); + LOG + .log(Level.WARNING, + "Exception removing workflow instances from index: Message: " + + e.getMessage()); + throw new InstanceRepositoryException(e.getMessage()); + } finally { + if (writer != null){ + try{ + writer.close(); + } + catch(Exception ignore){} + + writer = null; + } + + } + + return true; + } /* * (non-Javadoc) http://git-wip-us.apache.org/repos/asf/oodt/blob/d49d72a7/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/MemoryWorkflowInstanceRepository.java ---------------------------------------------------------------------- diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/MemoryWorkflowInstanceRepository.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/MemoryWorkflowInstanceRepository.java index 5ee9b95..876733b 100644 --- a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/MemoryWorkflowInstanceRepository.java +++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/MemoryWorkflowInstanceRepository.java @@ -218,4 +218,10 @@ public class MemoryWorkflowInstanceRepository extends return cnt; } + @Override + public synchronized boolean clearWorkflowInstances() throws InstanceRepositoryException { + this.workflowInstMap.clear(); + return true; + } + } http://git-wip-us.apache.org/repos/asf/oodt/blob/d49d72a7/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/WorkflowInstanceRepository.java ---------------------------------------------------------------------- diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/WorkflowInstanceRepository.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/WorkflowInstanceRepository.java index d6e0fd0..4ce2ffb 100644 --- a/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/WorkflowInstanceRepository.java +++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/instrepo/WorkflowInstanceRepository.java @@ -137,5 +137,13 @@ public interface WorkflowInstanceRepository extends Pagination { * @throws InstanceRepositoryException If there is any error that occurs. */ int getNumWorkflowInstancesByStatus(String status) throws InstanceRepositoryException; + + /** + * Clears the instance repository of all workflows. + * @return False if there was any error (logged), and True otherwise. + * @throws InstanceRepositoryException If there was some IO or other error deleting + * workflow instances that was unrecoverable from. + */ + public boolean clearWorkflowInstances() throws InstanceRepositoryException; } http://git-wip-us.apache.org/repos/asf/oodt/blob/d49d72a7/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManager.java ---------------------------------------------------------------------- diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManager.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManager.java index 0256602..37b22d3 100644 --- a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManager.java +++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManager.java @@ -496,6 +496,14 @@ public class XmlRpcWorkflowManager { return null; } } + + public synchronized boolean clearWorkflowInstances() throws InstanceRepositoryException{ + String numInsts = String.valueOf(this.getNumWorkflowInstances()); + LOG.info("Removing ["+numInsts+"] total workflow " + + "instances from the instance repository."); + this.engine.getInstanceRepository().clearWorkflowInstances(); + return true; + } public List getWorkflows() throws RepositoryException { List workflowList = repo.getWorkflows(); http://git-wip-us.apache.org/repos/asf/oodt/blob/d49d72a7/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java ---------------------------------------------------------------------- diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java index b6355a8..fc1b618 100644 --- a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java +++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java @@ -351,6 +351,12 @@ public class XmlRpcWorkflowManagerClient { .execute("workflowmgr.handleEvent", argList); } + + public boolean clearWorkflowInstances() throws XmlRpcException, IOException{ + Vector argList = new Vector(); + + return (Boolean)client.execute("workflowmgr.clearWorkflowInstances", argList); + } public WorkflowTask getTaskById(String taskId) throws XmlRpcException, IOException, RepositoryException { Vector argList = new Vector(); http://git-wip-us.apache.org/repos/asf/oodt/blob/d49d72a7/workflow/src/test/java/org/apache/oodt/cas/workflow/instrepo/TestLuceneWorkflowInstanceRepository.java ---------------------------------------------------------------------- diff --git a/workflow/src/test/java/org/apache/oodt/cas/workflow/instrepo/TestLuceneWorkflowInstanceRepository.java b/workflow/src/test/java/org/apache/oodt/cas/workflow/instrepo/TestLuceneWorkflowInstanceRepository.java index 19147a8..7c8bec4 100644 --- a/workflow/src/test/java/org/apache/oodt/cas/workflow/instrepo/TestLuceneWorkflowInstanceRepository.java +++ b/workflow/src/test/java/org/apache/oodt/cas/workflow/instrepo/TestLuceneWorkflowInstanceRepository.java @@ -176,6 +176,25 @@ public class TestLuceneWorkflowInstanceRepository extends TestCase implements fail(e.getMessage()); } } + + public void testClearInstances(){ + try{ + repo.addWorkflowInstance(testInst); + assertEquals(1, repo.getNumWorkflowInstances()); + repo.clearWorkflowInstances(); + assertEquals(0, repo.getNumWorkflowInstances()); + + for(int i=0; i < 25; i++){ + repo.addWorkflowInstance(testInst); + } + assertEquals(25, repo.getNumWorkflowInstances()); + repo.clearWorkflowInstances(); + assertEquals(0, repo.getNumWorkflowInstances()); + } + catch(InstanceRepositoryException e){ + fail(e.getLocalizedMessage()); + } + } public void testUpdateDocumentAndPreserveId() { try {
