This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f3157198f17608359bea0b1a3576667bcae96bf5
Author: Andrea Tarocchi <[email protected]>
AuthorDate: Thu Feb 4 23:12:27 2021 +0100

    fixed #980 : camel.source.contentLogLevel config not honored in source 
connectors
---
 .../org/apache/camel/kafkaconnector/CamelSinkTask.java    | 14 +++++++++++---
 .../org/apache/camel/kafkaconnector/CamelSourceTask.java  | 15 +++++++++++----
 .../apache/camel/kafkaconnector/CamelSinkTaskTest.java    | 13 +++++++++++++
 .../apache/camel/kafkaconnector/CamelSourceTaskTest.java  | 14 ++++++++++++++
 4 files changed, 49 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 50a44da..bc6a76c 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -54,10 +54,10 @@ public class CamelSinkTask extends SinkTask {
     private static final String LOCAL_URL = "direct:start";
     private ErrantRecordReporter reporter;
 
-
     private CamelKafkaConnectMain cms;
     private ProducerTemplate producer;
     private Endpoint localEndpoint;
+
     private LoggingLevel loggingLevel = LoggingLevel.OFF;
     private boolean mapProperties;
     private boolean mapHeaders;
@@ -78,11 +78,11 @@ public class CamelSinkTask extends SinkTask {
                 reporter = context.errantRecordReporter();
             }
 
+            String levelStr = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
             try {
-                String levelStr = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
                 loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
             } catch (Exception e) {
-                LOG.debug("Invalid value for {} property", 
CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
+                LOG.debug("Invalid value {} for {} property", 
levelStr.toUpperCase(), 
CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
             }
 
             String remoteUrl = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
@@ -234,4 +234,12 @@ public class CamelSinkTask extends SinkTask {
     CamelKafkaConnectMain getCms() {
         return cms;
     }
+
+    public LoggingLevel getLoggingLevel() {
+        return loggingLevel;
+    }
+
+    public void setLoggingLevel(LoggingLevel loggingLevel) {
+        this.loggingLevel = loggingLevel;
+    }
 }
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 4c138af..16e6bfc 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -73,11 +73,11 @@ public class CamelSourceTask extends SourceTask {
             Map<String, String> actualProps = 
TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props);
             CamelSourceConnectorConfig config = 
getCamelSourceConnectorConfig(actualProps);
 
+            String levelStr = 
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
             try {
-                String levelStr = 
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
-                loggingLevel = LoggingLevel.valueOf(levelStr.toLowerCase());
+                loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
             } catch (Exception e) {
-                LOG.debug("Invalid value for {} property", 
CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
+                LOG.error("Invalid value {} for {} property", 
levelStr.toUpperCase(), 
CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
             }
 
             maxBatchPollSize = 
config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
@@ -153,7 +153,6 @@ public class CamelSourceTask extends SourceTask {
         return maxPollDuration - (Instant.now().toEpochMilli() - 
startPollEpochMilli);
     }
 
-
     @Override
     public synchronized List<SourceRecord> poll() {
         final long startPollEpochMilli = Instant.now().toEpochMilli();
@@ -313,4 +312,12 @@ public class CamelSourceTask extends SourceTask {
     CamelKafkaConnectMain getCms() {
         return cms;
     }
+
+    public LoggingLevel getLoggingLevel() {
+        return loggingLevel;
+    }
+
+    public void setLoggingLevel(LoggingLevel loggingLevel) {
+        this.loggingLevel = loggingLevel;
+    }
 }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 5aaca7f..bab0a5d 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -1111,4 +1111,17 @@ public class CamelSinkTaskTest {
         }
     }
 
+    @Test
+    public void testContentLogLevelConfiguration() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, 
"INFO");
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+        assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel());
+
+        sinkTask.stop();
+    }
 }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index b2a7c4e..21d56fc 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -607,4 +607,18 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
+
+    @Test
+    public void testContentLogLevelConfiguration() {
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, 
DIRECT_URI);
+        
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, 
"INFO");
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
+        assertEquals(LoggingLevel.INFO, sourceTask.getLoggingLevel());
+
+        sourceTask.stop();
+    }
 }

Reply via email to