Repository: falcon Updated Branches: refs/heads/master 470f34313 -> d55cdd095
FALCON-2123 Make Blocking Queue configurable in LogMoverService Author: Praveen Adlakha <[email protected]> Reviewers: @pallavi-rao @sandeepSamudrala Closes #272 from PraveenAdlakha/2123 and squashes the following commits: d4fb9b9 [Praveen Adlakha] thread count changed in startup.properties fd0e244 [Praveen Adlakha] 2125 commit 1342d9d [Praveen Adlakha] FALCON-2123 Make Blocking Queue configurable in LogMoverService Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d55cdd09 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d55cdd09 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d55cdd09 Branch: refs/heads/master Commit: d55cdd0951bc2c5344ce5e9493c290e58d85234a Parents: 470f343 Author: Praveen Adlakha <[email protected]> Authored: Wed Aug 24 13:11:23 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Wed Aug 24 13:11:23 2016 +0530 ---------------------------------------------------------------------- common/src/main/resources/startup.properties | 5 +++-- .../falcon/oozie/OozieOrchestrationWorkflowBuilder.java | 10 +++++++++- .../falcon/oozie/feed/FSReplicationWorkflowBuilder.java | 2 +- .../falcon/oozie/feed/HCatReplicationWorkflowBuilder.java | 8 ++++---- .../oozie/process/ProcessExecutionWorkflowBuilder.java | 2 +- .../java/org/apache/falcon/service/LogMoverService.java | 7 ++++--- src/conf/startup.properties | 5 +++-- 7 files changed, 25 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 6c2ab5c..8d64c54 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -350,5 +350,6 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle # Property to remove postProcessing *.falcon.postprocessing.enable=true -### LogMoveService Thread count -*.falcon.logMoveService.threadCount=50 +### LogMoveService Properties +*.falcon.logMoveService.threadCount=200 +*.falcon.logMoveService.blockingQueue.length=50 http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java index 9683e62..8d45d7a 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java @@ -230,12 +230,20 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend wf.getDecisionOrForkOrJoin().add(kill); } + protected String getFailAction(){ + if (!isPostProcessingEnabled()){ + return FAIL_ACTION_NAME; + }else{ + return FAIL_POSTPROCESS_ACTION_NAME; + } + } + protected void addPostProcessing(WORKFLOWAPP workflow, ACTION action) throws FalconException{ if (!isPostProcessingEnabled()){ addTransition(action, OK_ACTION_NAME, FAIL_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(action); }else{ - addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); + addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(action); //Add post-processing actions http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java index 598cf6f..7cad507 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java @@ -48,7 +48,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder if (shouldPreProcess()) { ACTION action = getPreProcessingAction(false, Tag.REPLICATION); addHDFSServersConfig(action, src, target); - addTransition(action, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); + addTransition(action, REPLICATION_ACTION_NAME, getFailAction()); workflow.getDecisionOrForkOrJoin().add(action); start = PREPROCESS_ACTION_NAME; } http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java index f4eecb7..ba86c6e 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java @@ -73,7 +73,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild action.getJava().getConfiguration().getProperty().add(prop); } addHDFSServersConfig(action, src, target); - addTransition(action, EXPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); + addTransition(action, EXPORT_ACTION_NAME, getFailAction()); workflow.getDecisionOrForkOrJoin().add(action); start = PREPROCESS_ACTION_NAME; } @@ -95,14 +95,14 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild hiveExportAction.getConfiguration().getProperty().add(prop); } OozieUtils.marshalHiveAction(export, exportActionJaxbElement); - addTransition(export, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); + addTransition(export, REPLICATION_ACTION_NAME, getFailAction()); workflow.getDecisionOrForkOrJoin().add(export); //Add replication ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE); addHDFSServersConfig(replication, src, target); addAdditionalReplicationProperties(replication); - addTransition(replication, IMPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); + addTransition(replication, IMPORT_ACTION_NAME, getFailAction()); workflow.getDecisionOrForkOrJoin().add(replication); //Add import action @@ -122,7 +122,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild hiveImportAction.getConfiguration().getProperty().add(prop); } OozieUtils.marshalHiveAction(importAction, importActionJaxbElement); - addTransition(importAction, CLEANUP_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); + addTransition(importAction, CLEANUP_ACTION_NAME, getFailAction()); workflow.getDecisionOrForkOrJoin().add(importAction); //Add cleanup action http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java index c31b4ee..20eeffd 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java @@ -79,7 +79,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration //Add pre-processing action if (shouldPreProcess()) { ACTION preProcessAction = getPreProcessingAction(isTableStorageType, Tag.DEFAULT); - addTransition(preProcessAction, USER_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); + addTransition(preProcessAction, USER_ACTION_NAME, getFailAction()); wfApp.getDecisionOrForkOrJoin().add(preProcessAction); startAction = PREPROCESS_ACTION_NAME; } http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java index 1f3d0a0..ba4835d 100644 --- a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java +++ b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java @@ -44,12 +44,13 @@ public class LogMoverService implements WorkflowExecutionListener { public static final String ENABLE_POSTPROCESSING = StartupProperties.get(). getProperty("falcon.postprocessing.enable"); - private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(50); + private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(Integer.parseInt( + StartupProperties.get().getProperty("falcon.logMoveService.blockingQueue.length", "50"))); private ExecutorService executorService = new ThreadPoolExecutor(20, getThreadCount(), 120, TimeUnit.SECONDS, blockingQueue); public int getThreadCount() { try{ - return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount")); + return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount", "200")); } catch (NumberFormatException e){ LOG.error("Exception in LogMoverService", e); return 50; @@ -87,7 +88,7 @@ public class LogMoverService implements WorkflowExecutionListener { } while(0<blockingQueue.remainingCapacity()){ try { - LOG.info("Sleeing, no capacity in threadpool...."); + LOG.debug("Sleeping, no capacity in threadpool...."); TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 37e37bc..b663f04 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -367,5 +367,6 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ # Property to remove postProcessing *.falcon.postprocessing.enable=true -### LogMoveService Thread count -*.falcon.logMoveService.threadCount=50 +### LogMoveService Properties +*.falcon.logMoveService.threadCount=200 +*.falcon.logMoveService.blockingQueue.length=50
