This is an automated email from the ASF dual-hosted git repository. mattmann pushed a commit to branch gsoc18 in repository https://gitbox.apache.org/repos/asf/drat.git
commit abce3831775d3af5b97557cd15ab2d1030e9be2b Author: Chris Mattmann <[email protected]> AuthorDate: Sun Aug 5 16:02:18 2018 -0700 - refactor and clean up reduce --- .../src/main/java/backend/ProcessDratWrapper.java | 115 ++++++++++----------- .../test/java/backend/TestProcessDratWrapper.java | 8 +- 2 files changed, 60 insertions(+), 63 deletions(-) diff --git a/proteus/src/main/java/backend/ProcessDratWrapper.java b/proteus/src/main/java/backend/ProcessDratWrapper.java index a068685..06e1ff8 100644 --- a/proteus/src/main/java/backend/ProcessDratWrapper.java +++ b/proteus/src/main/java/backend/ProcessDratWrapper.java @@ -70,9 +70,9 @@ public class ProcessDratWrapper extends GenericProcess private static final String MAP_CMD = "map"; private static final String REDUCE_CMD = "reduce"; private static final String STATUS_IDLE = "idle"; - private static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner"; - private static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit"; - private static final String REDUCE_TASK_ID = "urn:drat:RatAggregator"; + protected static final String PARTITION_AND_MAP_TASK_ID = "urn:drat:MimePartitioner"; + protected static final String MAPPER_TASK_ID = "urn:drat:RatCodeAudit"; + protected static final String REDUCE_TASK_ID = "urn:drat:RatAggregator"; private static final String[] WIPE_TYPES = { "RatLog", "GenericFile", "RatAggregateLog" }; @@ -179,18 +179,18 @@ public class ProcessDratWrapper extends GenericProcess @Override public void reduce() throws IOException { setStatus(REDUCE_CMD); - DratLog mapLog = new DratLog("REDUCING"); + DratLog reduceLog = new DratLog("REDUCING"); WorkflowRestResource restResource = new WorkflowRestResource(); DynamicWorkflowRequestWrapper requestBody = new DynamicWorkflowRequestWrapper(); requestBody.taskIds = new ArrayList<>(); requestBody.taskIds.add(REDUCE_TASK_ID); LOG.info("STARTING REDUCING"); - mapLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID); + reduceLog.logInfo("STARTING", " (dynamic workflow with task "+REDUCE_TASK_ID); String resp = (String)restResource.performDynamicWorkFlow(requestBody); if(resp.equals("OK")) { - mapLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow"); + reduceLog.logInfo("STARTED SUCCESSFULLY, "+REDUCE_TASK_ID+" dynamic workflow"); }else { - mapLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp); + reduceLog.logSevere("FAILED", "Dynamic workflow starting failed "+resp); throw new IOException(resp); } } @@ -257,15 +257,19 @@ public class ProcessDratWrapper extends GenericProcess this.map(); // don't run reduce until all maps are done - while (mapsStillRunning()) { + while (stillRunning(PARTITION_AND_MAP_TASK_ID) || stillRunning(MAPPER_TASK_ID)) { Thread.sleep(DRAT_PROCESS_WAIT_DURATION); LOG.info("MAP STILL RUNNING"); } // you're not done until the final log is generated. while (!hasAggregateRatLog()) { try { - reduce(); - LOG.info("REDUCE STILL RUNNING"); + if (!stillRunning(REDUCE_TASK_ID)) { + reduce(); + } + else { + LOG.info("REDUCE STILL RUNNING."); + } } catch (IOException e) { LOG.warning("Fired reduce off before mappers were done. Sleeping: [" + String.valueOf(DRAT_PROCESS_WAIT_DURATION / 1000) @@ -287,15 +291,16 @@ public class ProcessDratWrapper extends GenericProcess + "]: " + breakStatus); return numLogs > 0; } + + private boolean stillRunning(String taskId) throws Exception { + WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL); + List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances(); + for(WorkflowInstance instance : workflowInstances){ + LOG.info("Running Instances : id: "+instance.getId() + +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName()); + } + return taskStillRunning(workflowInstances, taskId); - private boolean mapsStillRunning() throws Exception { - WorkflowManagerUtils workflowManagerUtils = new WorkflowManagerUtils(FileConstants.CLIENT_URL); - List<WorkflowInstance> workflowInstances = workflowManagerUtils.getClient().getWorkflowInstances(); - for(WorkflowInstance instance : workflowInstances){ - LOG.info("Running Instances : id: "+instance.getId() - +" state name "+instance.getState().getName()+" current task name : "+instance.getCurrentTask().getTaskName()); - } - return stillRunning(workflowInstances); } @VisibleForTesting @@ -340,62 +345,52 @@ public class ProcessDratWrapper extends GenericProcess } return items; } - - @VisibleForTesting - protected boolean stillRunning(List<WorkflowInstance> instances) { - List<WorkflowInstance> partitionInstances = filterPartitioners(instances); - List<WorkflowInstance> mapperInstances = filterMappers(instances); - LOG.info("Checking partitioners: inspecting ["+String.valueOf(partitionInstances - .size()) + "] partitioners."); - for (WorkflowInstance partitionInstance: partitionInstances) { - if (isRunning(partitionInstance.getState().getName())) { - LOG.info("Partitioner: [" + partitionInstance.getId() + "] still running."); - return true; - } - } - - LOG.info("Checking mappers: inspecting [" - + String.valueOf(mapperInstances.size()) + "] mappers."); - for (WorkflowInstance mapperInstance : mapperInstances) { - if (isRunning(mapperInstance.getState().getName())) { - LOG.info("Mapper: [" + mapperInstance.getId() + "] still running."); - return true; + + protected boolean taskStillRunning(List<WorkflowInstance> instances, String ...taskIds) { + if (taskIds != null && taskIds.length > 0) { + for(String taskId: taskIds) { + List<WorkflowInstance> insts = filterInstances(instances, taskId); + LOG.info("Checking task: "+taskId+" : inspecting ["+String.valueOf(instances.size())+"] tasks."); + for(WorkflowInstance i: insts) { + if(isRunning(i.getState().getName())) { + LOG.info("Task: [" + i.getId() + "] still running."); + return true; + } + } } } + return false; } @VisibleForTesting + @Deprecated protected List<WorkflowInstance> filterPartitioners(List<WorkflowInstance> instances){ - List<WorkflowInstance> partitioners = new ArrayList<>(); + return filterInstances(instances, PARTITION_AND_MAP_TASK_ID); + } + + @VisibleForTesting + @Deprecated + protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){ + return this.filterInstances(instances, MAPPER_TASK_ID); + } + + @VisibleForTesting + protected List<WorkflowInstance> filterInstances(List<WorkflowInstance> instances, String taskId){ + List<WorkflowInstance> insts = new ArrayList<>(); if(instances!=null && instances.size()>0){ for(WorkflowInstance instance:instances){ - if (instance.getCurrentTask().getTaskId().equals(PARTITION_AND_MAP_TASK_ID)) { - LOG.info("Adding partition/map: ["+instance.getCurrentTask().getTaskId()+"]"); - partitioners.add(instance); + if(instance.getCurrentTask().getTaskId().equals(taskId)){ + LOG.info("Adding "+taskId+" instance: [" + instance.getCurrentTask().getTaskId() + "]"); + insts.add(instance); }else{ LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]"); } } } - return partitioners; - } - - @VisibleForTesting - protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){ - List<WorkflowInstance> mappers = new ArrayList<>(); - if(instances!=null && instances.size()>0){ - for(WorkflowInstance instance:instances){ - if(instance.getCurrentTask().getTaskId().equals(MAPPER_TASK_ID)){ - LOG.info("Adding mapper: [" + instance.getCurrentTask().getTaskId() + "]"); - mappers.add(instance); - }else{ - LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]"); - } - } - } - return mappers; - } + return insts; +} + @VisibleForTesting protected boolean isRunning(String status) { diff --git a/proteus/src/test/java/backend/TestProcessDratWrapper.java b/proteus/src/test/java/backend/TestProcessDratWrapper.java index 181837d..c816512 100644 --- a/proteus/src/test/java/backend/TestProcessDratWrapper.java +++ b/proteus/src/test/java/backend/TestProcessDratWrapper.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.List; import org.apache.oodt.cas.workflow.structs.WorkflowInstance; import backend.ProcessDratWrapper; +import static backend.ProcessDratWrapper.MAPPER_TASK_ID; +import static backend.ProcessDratWrapper.PARTITION_AND_MAP_TASK_ID; import junit.framework.TestCase; public class TestProcessDratWrapper extends TestCase { @@ -52,7 +54,7 @@ public class TestProcessDratWrapper extends TestCase { for(WorkflowItem wi: items) { insts.add(wi.toInstance()); } - assertTrue(wrapper.stillRunning(insts)); + assertTrue(wrapper.taskStillRunning(insts, PARTITION_AND_MAP_TASK_ID, MAPPER_TASK_ID)); } public void testFilterPartitioners(){ @@ -70,7 +72,7 @@ public class TestProcessDratWrapper extends TestCase { insts.add(wi.toInstance()); } List<WorkflowInstance> partitioners = null; - partitioners = wrapper.filterPartitioners(insts); + partitioners = wrapper.filterInstances(insts, PARTITION_AND_MAP_TASK_ID); assertNotNull(partitioners); assertEquals(2, partitioners.size()); } @@ -90,7 +92,7 @@ public class TestProcessDratWrapper extends TestCase { insts.add(wi.toInstance()); } List<WorkflowInstance> mappers = null; - mappers = wrapper.filterMappers(insts); + mappers = wrapper.filterInstances(insts, MAPPER_TASK_ID); assertNotNull(mappers); assertEquals(1, mappers.size()); }
