Repository: airavata Updated Branches: refs/heads/lahiru/AIRAVATA-2065 46b8502eb -> 58183be10
Fix issues with the current implementation Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/58183be1 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/58183be1 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/58183be1 Branch: refs/heads/lahiru/AIRAVATA-2065 Commit: 58183be1070e95cfc83e6ef360ec2def5e670706 Parents: 4edad01 Author: Lahiru Ginnaliya Gamathige <[email protected]> Authored: Mon Aug 29 00:52:17 2016 -0700 Committer: Lahiru Ginnaliya Gamathige <[email protected]> Committed: Mon Aug 29 00:53:10 2016 -0700 ---------------------------------------------------------------------- .../airavata/common/logging/kafka/KafkaAppender.java | 12 +++++++++--- modules/distribution/pom.xml | 8 -------- .../java/org/apache/airavata/server/ServerMain.java | 4 +++- 3 files changed, 12 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/58183be1/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java b/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java index a4a6a7b..a9a6ffd 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java @@ -51,20 +51,20 @@ public class KafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent> { private ServerId serverId = null; - public KafkaAppender(String kafkaTopicPrefix, String kafkaHost) { + public KafkaAppender(String kafkaHost, String kafkaTopicPrefix) { Properties props = new Properties(); props.put("bootstrap.servers", kafkaHost); props.put("acks", "0"); props.put("retries", 0); props.put("batch.size", 16384); - props.put("linger.ms", 10000); // Send the batch every 5 seconds + props.put("linger.ms", 10000); // Send the batch every 10 seconds props.put("buffer.memory", 33554432); props.put("producer.type", "async"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); this.kafkaTopic = getKafkaTopicPrefix(kafkaTopicPrefix); - logger.info("Starting kafka producer: bootstrap-server:{}, topic : {}", kafkaHost, kafkaTopic); + logger.info("Starting kafka producer: bootstrap-server:{}, topic : {}", kafkaHost, this.kafkaTopic); this.producer = new KafkaProducer<>(props); if(ServerSettings.isRunningOnAws()) { final AwsMetadata awsMetadata = new AwsMetadata(); @@ -78,6 +78,7 @@ public class KafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent> { @Override protected void append(ILoggingEvent event) { + event.prepareForDeferredProcessing(); //todo do more elegant streaming approach to publish logs if (!event.getLevel().equals(Level.ALL) && // OFF AND ALL are not loggable levels !event.getLevel().equals(Level.OFF)) { @@ -103,6 +104,11 @@ public class KafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent> { private String getKafkaTopicPrefix(String kafkaTopicPrefix) { final StringBuilder stringBuffer = new StringBuilder(""); + final String[] serverRoles = ServerSettings.getServerRoles(); + if (serverRoles.length == 4) { + return kafkaTopicPrefix + "_all"; + } + for (String role : ServerSettings.getServerRoles()) { stringBuffer.append("_"); stringBuffer.append(role); http://git-wip-us.apache.org/repos/asf/airavata/blob/58183be1/modules/distribution/pom.xml ---------------------------------------------------------------------- diff --git a/modules/distribution/pom.xml b/modules/distribution/pom.xml index 01ff201..d096739 100644 --- a/modules/distribution/pom.xml +++ b/modules/distribution/pom.xml @@ -164,14 +164,6 @@ <artifactId>jcl-over-slf4j</artifactId> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </dependency> - <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> <version>1.9.0</version> http://git-wip-us.apache.org/repos/asf/airavata/blob/58183be1/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java ---------------------------------------------------------------------- diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java index bcde1e9..134267e 100644 --- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java +++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java @@ -170,7 +170,9 @@ public class ServerMain { kafkaAppender.setName("kafka-appender"); kafkaAppender.clearAllFilters(); kafkaAppender.start(); - ((LoggerContext) iLoggerFactory).getLogger(Logger.ROOT_LOGGER_NAME).addAppender(kafkaAppender); + ((LoggerContext) iLoggerFactory).getLogger("org.apache.airavata").addAppender(kafkaAppender); + } else { + logger.warn("Kafka logging is enabled but cannot find logback LoggerContext, found", iLoggerFactory.getClass().toString()); } }
