Repository: incubator-eagle
Updated Branches:
  refs/heads/master 473496747 -> 3d6a29ec2


[MINOR] Set task number of each bolt in HdfsAuditLog Application

If not set, the task number of all bolts are 8

Author: Zhao, Qingwen <qingwz...@apache.org>

Closes #521 from qingwen220/quickFix.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3d6a29ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3d6a29ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3d6a29ec

Branch: refs/heads/master
Commit: 3d6a29ec2e67a83742158a08d378cfc5cad59814
Parents: 4734967
Author: Zhao, Qingwen <qingwz...@apache.org>
Authored: Mon Oct 17 22:45:19 2016 +0800
Committer: Zhao, Qingwen <qingwz...@apache.org>
Committed: Mon Oct 17 22:45:19 2016 +0800

----------------------------------------------------------------------
 .../auditlog/AbstractHdfsAuditLogApplication.java         | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3d6a29ec/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
index b985daf..b9f480b 100644
--- 
a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
+++ 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -59,11 +59,11 @@ public abstract class AbstractHdfsAuditLogApplication 
extends StormApplication {
         int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM);
         int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
 
-        builder.setSpout("ingest", spout, numOfSpoutTasks);
+        builder.setSpout("ingest", spout, 
numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
 
 
         BaseRichBolt parserBolt = getParserBolt();
-        BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, 
numOfParserTasks);
+        BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, 
numOfParserTasks).setNumTasks(numOfParserTasks);
 
         Boolean useDefaultPartition = 
!config.hasPath("eagleProps.useDefaultPartition") || 
config.getBoolean("eagleProps.useDefaultPartition");
         if(useDefaultPartition){
@@ -73,15 +73,15 @@ public abstract class AbstractHdfsAuditLogApplication 
extends StormApplication {
         }
 
         HdfsSensitivityDataEnrichBolt sensitivityDataJoinBolt = new 
HdfsSensitivityDataEnrichBolt(config);
-        BoltDeclarer sensitivityDataJoinBoltDeclarer = 
builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, 
numOfSensitivityJoinTasks);
+        BoltDeclarer sensitivityDataJoinBoltDeclarer = 
builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, 
numOfSensitivityJoinTasks).setNumTasks(numOfSensitivityJoinTasks);
         sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new 
Fields("f1"));
 
         IPZoneDataEnrichBolt ipZoneDataJoinBolt = new 
IPZoneDataEnrichBolt(config);
-        BoltDeclarer ipZoneDataJoinBoltDeclarer = 
builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, numOfIPZoneJoinTasks);
+        BoltDeclarer ipZoneDataJoinBoltDeclarer = 
builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, 
numOfIPZoneJoinTasks).setNumTasks(numOfIPZoneJoinTasks);
         ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new 
Fields("user"));
 
         StormStreamSink sinkBolt = 
environment.getStreamSink("hdfs_audit_log_stream",config);
-        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", 
sinkBolt, numOfSinkTasks);
+        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", 
sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks);
         kafkaBoltDeclarer.fieldsGrouping("ipZoneJoin", new Fields("user"));
         return builder.createTopology();
 

Reply via email to