This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch kamelets
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 9b229ed862ab3bcbd5be1712ca7c00479e0d5c80
Author: Andrea Tarocchi <andrea.taroc...@gmail.com>
AuthorDate: Sat May 15 07:59:12 2021 +0200

    Related to #423 resolved a problem with marshal/unmarshal after fixin 
https://issues.apache.org/jira/browse/CAMEL-16551
---
 .../utils/CamelKafkaConnectMain.java               | 29 +++++++++++-----------
 .../camel/kafkaconnector/CamelSourceTaskTest.java  |  5 ----
 2 files changed, 14 insertions(+), 20 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 6e7dbdf..036375b 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -228,16 +228,15 @@ public class CamelKafkaConnectMain extends SimpleMain {
             Properties camelProperties = new Properties();
             camelProperties.putAll(props);
 
-            //TODO: enable or delete these parameters once 
https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-//            //dataformats
-//            if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-//                
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
-//                
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
-//            }
-//            if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-//                
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLETE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
-//                
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
-//            }
+            //dataformats
+            if (!ObjectHelper.isEmpty(marshallDataFormat)) {
+                
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
+                
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"marshall", marshallDataFormat);
+            }
+            if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
+                
camelProperties.put(CamelSourceTask.KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
+                
camelProperties.put(CamelSinkTask.KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + 
"unmarshall", unmarshallDataFormat);
+            }
 
             //aggregator
             if (!ObjectHelper.isEmpty(aggregationSize)) {
@@ -310,9 +309,9 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     RouteTemplateDefinition rtdSource = 
routeTemplate("ckcSource")
                             .templateParameter("fromUrl")
                             .templateParameter("errorHandler", 
"ckcErrorHandler")
-                            //TODO: enable or delete these parameters once 
https://issues.apache.org/jira/browse/CAMEL-16551 is resolved
-//                            .templateParameter("marshall", "dummyDataformat")
-//                            .templateParameter("unmarshall", 
"dummyDataformat")
+
+                            .templateParameter("marshall", "dummyDataformat")
+                            .templateParameter("unmarshall", "dummyDataformat")
 
                             //TODO: change 
CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NA to ckcAggregationStrategy?
                             .templateParameter("aggregationStrategy", 
CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME)
@@ -327,10 +326,10 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     ProcessorDefinition<?> rdInTemplateSource = 
rtdSource.from("{{fromUrl}}")
                             .errorHandler(new 
ErrorHandlerBuilderRef("{{errorHandler}}"));
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {
-                        rdInTemplateSource = 
rdInTemplateSource.marshal(marshallDataFormat);
+                        rdInTemplateSource = 
rdInTemplateSource.marshal("{{marshall}}");
                     }
                     if (!ObjectHelper.isEmpty(unmarshallDataFormat)) {
-                        rdInTemplateSource = 
rdInTemplateSource.unmarshal(unmarshallDataFormat);
+                        rdInTemplateSource = 
rdInTemplateSource.unmarshal("{{unmarshall}}");
                     }
 
                     if (getContext().getRegistry().lookupByName("aggregate") 
!= null) {
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index b1271ac..36ae9e2 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -235,9 +235,6 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-//        assertEquals(4, 
sourceTask.getCms().getCamelContext().getEndpoints().size());
-
-
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("timer"))
                 .forEach(e -> {
@@ -261,8 +258,6 @@ public class CamelSourceTaskTest {
         CamelSourceTask sourceTask = new CamelSourceTask();
         sourceTask.start(props);
 
-//        assertEquals(4, 
sourceTask.getCms().getCamelContext().getEndpoints().size());
-
         sourceTask.getCms().getCamelContext().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("seda"))
                 .forEach(e -> {

Reply via email to