This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit f3157198f17608359bea0b1a3576667bcae96bf5 Author: Andrea Tarocchi <[email protected]> AuthorDate: Thu Feb 4 23:12:27 2021 +0100 fixed #980 : camel.source.contentLogLevel config not honored in source connectors --- .../org/apache/camel/kafkaconnector/CamelSinkTask.java | 14 +++++++++++--- .../org/apache/camel/kafkaconnector/CamelSourceTask.java | 15 +++++++++++---- .../apache/camel/kafkaconnector/CamelSinkTaskTest.java | 13 +++++++++++++ .../apache/camel/kafkaconnector/CamelSourceTaskTest.java | 14 ++++++++++++++ 4 files changed, 49 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 50a44da..bc6a76c 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -54,10 +54,10 @@ public class CamelSinkTask extends SinkTask { private static final String LOCAL_URL = "direct:start"; private ErrantRecordReporter reporter; - private CamelKafkaConnectMain cms; private ProducerTemplate producer; private Endpoint localEndpoint; + private LoggingLevel loggingLevel = LoggingLevel.OFF; private boolean mapProperties; private boolean mapHeaders; @@ -78,11 +78,11 @@ public class CamelSinkTask extends SinkTask { reporter = context.errantRecordReporter(); } + String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF); try { - String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF); loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase()); } catch (Exception e) { - LOG.debug("Invalid value for {} property", CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF); + LOG.debug("Invalid value {} for {} property", levelStr.toUpperCase(), CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF); } String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF); @@ -234,4 +234,12 @@ public class CamelSinkTask extends SinkTask { CamelKafkaConnectMain getCms() { return cms; } + + public LoggingLevel getLoggingLevel() { + return loggingLevel; + } + + public void setLoggingLevel(LoggingLevel loggingLevel) { + this.loggingLevel = loggingLevel; + } } diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 4c138af..16e6bfc 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -73,11 +73,11 @@ public class CamelSourceTask extends SourceTask { Map<String, String> actualProps = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props); CamelSourceConnectorConfig config = getCamelSourceConnectorConfig(actualProps); + String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF); try { - String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF); - loggingLevel = LoggingLevel.valueOf(levelStr.toLowerCase()); + loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase()); } catch (Exception e) { - LOG.debug("Invalid value for {} property", CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF); + LOG.error("Invalid value {} for {} property", levelStr.toUpperCase(), CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF); } maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF); @@ -153,7 +153,6 @@ public class CamelSourceTask extends SourceTask { return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli); } - @Override public synchronized List<SourceRecord> poll() { final long startPollEpochMilli = Instant.now().toEpochMilli(); @@ -313,4 +312,12 @@ public class CamelSourceTask extends SourceTask { CamelKafkaConnectMain getCms() { return cms; } + + public LoggingLevel getLoggingLevel() { + return loggingLevel; + } + + public void setLoggingLevel(LoggingLevel loggingLevel) { + this.loggingLevel = loggingLevel; + } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 5aaca7f..bab0a5d 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -1111,4 +1111,17 @@ public class CamelSinkTaskTest { } } + @Test + public void testContentLogLevelConfiguration() { + Map<String, String> props = new HashMap<>(); + props.put(TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, "INFO"); + + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel()); + + sinkTask.stop(); + } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index b2a7c4e..21d56fc 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -607,4 +607,18 @@ public class CamelSourceTaskTest { sourceTask.stop(); } } + + @Test + public void testContentLogLevelConfiguration() { + Map<String, String> props = new HashMap<>(); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, "INFO"); + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); + assertEquals(LoggingLevel.INFO, sourceTask.getLoggingLevel()); + + sourceTask.stop(); + } }
