This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new a61f3980eb4b CAMEL-22567 - Camel-Flink: Deprecate DataSet API in favor 
of DataStream API (#19605)
a61f3980eb4b is described below

commit a61f3980eb4b7c1cd3d00608505ebba5ef40ae96
Author: Andrea Cosentino <[email protected]>
AuthorDate: Fri Oct 17 13:56:43 2025 +0200

    CAMEL-22567 - Camel-Flink: Deprecate DataSet API in favor of DataStream API 
(#19605)
    
    * CAMEL-22567 - Camel-Flink: Deprecate DataSet API in favor of DataStream 
API
    
    Signed-off-by: Andrea Cosentino <[email protected]>
    
    * CAMEL-22567 - Camel-Flink: Deprecate DataSet API in favor of DataStream 
API
    
    Signed-off-by: Andrea Cosentino <[email protected]>
    
    ---------
    
    Signed-off-by: Andrea Cosentino <[email protected]>
---
 .../org/apache/camel/catalog/components/flink.json |  12 +-
 .../component/flink/FlinkEndpointConfigurer.java   |  45 +++
 .../component/flink/FlinkEndpointUriFactory.java   |  10 +-
 .../org/apache/camel/component/flink/flink.json    |  12 +-
 .../camel-flink/src/main/docs/flink-component.adoc | 210 +++++++++++++
 .../component/flink/ConvertingDataSetCallback.java |   9 +
 .../camel/component/flink/DataSetCallback.java     |   6 +-
 .../component/flink/DataSetFlinkProducer.java      |   7 +
 .../component/flink/DataStreamFlinkProducer.java   |  97 ++++++
 .../camel/component/flink/FlinkComponent.java      |   6 +
 .../camel/component/flink/FlinkEndpoint.java       | 114 +++++++
 .../camel/component/flink/VoidDataSetCallback.java |   8 +
 .../annotations/AnnotatedDataSetCallback.java      |   5 +
 .../flink/annotations/DataSetCallback.java         |   8 +
 .../flink/DataStreamBatchProcessingIT.java         | 292 ++++++++++++++++++
 .../component/flink/DataStreamConfigurationIT.java | 329 +++++++++++++++++++++
 .../component/flink/DataStreamEdgeCasesIT.java     | 277 +++++++++++++++++
 .../component/flink/DataStreamProducerTest.java    | 117 ++++++++
 .../ROOT/pages/camel-4x-upgrade-guide-4_16.adoc    | 149 +++++++++-
 .../dsl/FlinkComponentBuilderFactory.java          |   1 +
 .../endpoint/dsl/FlinkEndpointBuilderFactory.java  | 199 +++++++++++++
 21 files changed, 1906 insertions(+), 7 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/flink.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/flink.json
index 85f0a4f96399..a19a18dabbb5 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/flink.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/flink.json
@@ -24,7 +24,7 @@
     "remote": true
   },
   "componentProperties": {
-    "dataSetCallback": { "index": 0, "kind": "property", "displayName": "Data 
Set Callback", "group": "producer", "label": "", "required": false, "type": 
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Function performing action against a DataSet." },
+    "dataSetCallback": { "index": 0, "kind": "property", "displayName": "Data 
Set Callback", "group": "producer", "label": "", "required": false, "type": 
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback", 
"deprecated": true, "autowired": false, "secret": false, "description": 
"Function performing action against a DataSet." },
     "dataStream": { "index": 1, "kind": "property", "displayName": "Data 
Stream", "group": "producer", "label": "", "required": false, "type": "object", 
"javaType": "org.apache.flink.streaming.api.datastream.DataStream", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"DataStream to compute against." },
     "dataStreamCallback": { "index": 2, "kind": "property", "displayName": 
"Data Stream Callback", "group": "producer", "label": "", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.flink.DataStreamCallback", "deprecated": false, 
"autowired": false, "secret": false, "description": "Function performing action 
against a DataStream." },
     "lazyStartProducer": { "index": 3, "kind": "property", "displayName": 
"Lazy Start Producer", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether the producer should be started lazy (on the first message). By 
starting lazy you can use this to allow CamelContext and routes to startup in 
situations where a producer may otherwise fail [...]
@@ -43,6 +43,14 @@
     "dataSetCallback": { "index": 3, "kind": "parameter", "displayName": "Data 
Set Callback", "group": "producer", "label": "", "required": false, "type": 
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Function performing action against a DataSet." },
     "dataStream": { "index": 4, "kind": "parameter", "displayName": "Data 
Stream", "group": "producer", "label": "", "required": false, "type": "object", 
"javaType": "org.apache.flink.streaming.api.datastream.DataStream", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"DataStream to compute against." },
     "dataStreamCallback": { "index": 5, "kind": "parameter", "displayName": 
"Data Stream Callback", "group": "producer", "label": "", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.flink.DataStreamCallback", "deprecated": false, 
"autowired": false, "secret": false, "description": "Function performing action 
against a DataStream." },
-    "lazyStartProducer": { "index": 6, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produc [...]
+    "checkpointingMode": { "index": 6, "kind": "parameter", "displayName": 
"Checkpointing Mode", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "enum", "javaType": 
"java.lang.String", "enum": [ "EXACTLY_ONCE", "AT_LEAST_ONCE" ], "deprecated": 
false, "autowired": false, "secret": false, "description": "Checkpointing mode: 
EXACTLY_ONCE (default) or AT_LEAST_ONCE. EXACTLY_ONCE provides stronger 
guarantees but may have higher overhead." },
+    "checkpointInterval": { "index": 7, "kind": "parameter", "displayName": 
"Checkpoint Interval", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "integer", "javaType": 
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false, 
"description": "Interval in milliseconds between checkpoints. Enables 
checkpointing when set. Recommended for streaming jobs to ensure fault 
tolerance." },
+    "checkpointTimeout": { "index": 8, "kind": "parameter", "displayName": 
"Checkpoint Timeout", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "integer", "javaType": 
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false, 
"description": "Timeout in milliseconds for checkpoints. Checkpoints that take 
longer will be aborted." },
+    "executionMode": { "index": 9, "kind": "parameter", "displayName": 
"Execution Mode", "group": "producer (advanced)", "label": "producer,advanced", 
"required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ 
"STREAMING", "BATCH", "AUTOMATIC" ], "deprecated": false, "autowired": false, 
"secret": false, "description": "Execution mode for the Flink job. Options: 
STREAMING (default), BATCH, AUTOMATIC. BATCH mode is recommended for bounded 
streams (batch processing)." },
+    "jobName": { "index": 10, "kind": "parameter", "displayName": "Job Name", 
"group": "producer (advanced)", "label": "producer,advanced", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Name for the Flink job. 
Useful for identification in the Flink UI and logs." },
+    "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
+    "maxParallelism": { "index": 12, "kind": "parameter", "displayName": "Max 
Parallelism", "group": "producer (advanced)", "label": "producer,advanced", 
"required": false, "type": "integer", "javaType": "java.lang.Integer", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Maximum parallelism for the Flink job. Defines the upper bound for dynamic 
scaling and the number of key groups for stateful operators." },
+    "minPauseBetweenCheckpoints": { "index": 13, "kind": "parameter", 
"displayName": "Min Pause Between Checkpoints", "group": "producer (advanced)", 
"label": "producer,advanced", "required": false, "type": "integer", "javaType": 
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false, 
"description": "Minimum pause in milliseconds between consecutive checkpoints. 
Helps prevent checkpoint storms under heavy load." },
+    "parallelism": { "index": 14, "kind": "parameter", "displayName": 
"Parallelism", "group": "producer (advanced)", "label": "producer,advanced", 
"required": false, "type": "integer", "javaType": "java.lang.Integer", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Parallelism for the Flink job. If not set, uses the default parallelism of the 
execution environment." }
   }
 }
diff --git 
a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java
 
b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java
index 79b1a2aa6f47..07206669e81b 100644
--- 
a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java
+++ 
b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java
@@ -23,6 +23,12 @@ public class FlinkEndpointConfigurer extends 
PropertyConfigurerSupport implement
     public boolean configure(CamelContext camelContext, Object obj, String 
name, Object value, boolean ignoreCase) {
         FlinkEndpoint target = (FlinkEndpoint) obj;
         switch (ignoreCase ? name.toLowerCase() : name) {
+        case "checkpointinterval":
+        case "checkpointInterval": 
target.setCheckpointInterval(property(camelContext, java.lang.Long.class, 
value)); return true;
+        case "checkpointtimeout":
+        case "checkpointTimeout": 
target.setCheckpointTimeout(property(camelContext, java.lang.Long.class, 
value)); return true;
+        case "checkpointingmode":
+        case "checkpointingMode": 
target.setCheckpointingMode(property(camelContext, java.lang.String.class, 
value)); return true;
         case "collect": target.setCollect(property(camelContext, 
boolean.class, value)); return true;
         case "dataset":
         case "dataSet": target.setDataSet(property(camelContext, 
org.apache.flink.api.java.DataSet.class, value)); return true;
@@ -32,8 +38,17 @@ public class FlinkEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "dataStream": target.setDataStream(property(camelContext, 
org.apache.flink.streaming.api.datastream.DataStream.class, value)); return 
true;
         case "datastreamcallback":
         case "dataStreamCallback": 
target.setDataStreamCallback(property(camelContext, 
org.apache.camel.component.flink.DataStreamCallback.class, value)); return true;
+        case "executionmode":
+        case "executionMode": target.setExecutionMode(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "jobname":
+        case "jobName": target.setJobName(property(camelContext, 
java.lang.String.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": 
target.setLazyStartProducer(property(camelContext, boolean.class, value)); 
return true;
+        case "maxparallelism":
+        case "maxParallelism": target.setMaxParallelism(property(camelContext, 
java.lang.Integer.class, value)); return true;
+        case "minpausebetweencheckpoints":
+        case "minPauseBetweenCheckpoints": 
target.setMinPauseBetweenCheckpoints(property(camelContext, 
java.lang.Long.class, value)); return true;
+        case "parallelism": target.setParallelism(property(camelContext, 
java.lang.Integer.class, value)); return true;
         default: return false;
         }
     }
@@ -41,6 +56,12 @@ public class FlinkEndpointConfigurer extends 
PropertyConfigurerSupport implement
     @Override
     public Class<?> getOptionType(String name, boolean ignoreCase) {
         switch (ignoreCase ? name.toLowerCase() : name) {
+        case "checkpointinterval":
+        case "checkpointInterval": return java.lang.Long.class;
+        case "checkpointtimeout":
+        case "checkpointTimeout": return java.lang.Long.class;
+        case "checkpointingmode":
+        case "checkpointingMode": return java.lang.String.class;
         case "collect": return boolean.class;
         case "dataset":
         case "dataSet": return org.apache.flink.api.java.DataSet.class;
@@ -50,8 +71,17 @@ public class FlinkEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "dataStream": return 
org.apache.flink.streaming.api.datastream.DataStream.class;
         case "datastreamcallback":
         case "dataStreamCallback": return 
org.apache.camel.component.flink.DataStreamCallback.class;
+        case "executionmode":
+        case "executionMode": return java.lang.String.class;
+        case "jobname":
+        case "jobName": return java.lang.String.class;
         case "lazystartproducer":
         case "lazyStartProducer": return boolean.class;
+        case "maxparallelism":
+        case "maxParallelism": return java.lang.Integer.class;
+        case "minpausebetweencheckpoints":
+        case "minPauseBetweenCheckpoints": return java.lang.Long.class;
+        case "parallelism": return java.lang.Integer.class;
         default: return null;
         }
     }
@@ -60,6 +90,12 @@ public class FlinkEndpointConfigurer extends 
PropertyConfigurerSupport implement
     public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
         FlinkEndpoint target = (FlinkEndpoint) obj;
         switch (ignoreCase ? name.toLowerCase() : name) {
+        case "checkpointinterval":
+        case "checkpointInterval": return target.getCheckpointInterval();
+        case "checkpointtimeout":
+        case "checkpointTimeout": return target.getCheckpointTimeout();
+        case "checkpointingmode":
+        case "checkpointingMode": return target.getCheckpointingMode();
         case "collect": return target.isCollect();
         case "dataset":
         case "dataSet": return target.getDataSet();
@@ -69,8 +105,17 @@ public class FlinkEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "dataStream": return target.getDataStream();
         case "datastreamcallback":
         case "dataStreamCallback": return target.getDataStreamCallback();
+        case "executionmode":
+        case "executionMode": return target.getExecutionMode();
+        case "jobname":
+        case "jobName": return target.getJobName();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
+        case "maxparallelism":
+        case "maxParallelism": return target.getMaxParallelism();
+        case "minpausebetweencheckpoints":
+        case "minPauseBetweenCheckpoints": return 
target.getMinPauseBetweenCheckpoints();
+        case "parallelism": return target.getParallelism();
         default: return null;
         }
     }
diff --git 
a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java
 
b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java
index 1807f4f165a0..b3d97dff643e 100644
--- 
a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java
+++ 
b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java
@@ -23,14 +23,22 @@ public class FlinkEndpointUriFactory extends 
org.apache.camel.support.component.
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Map<String, String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(7);
+        Set<String> props = new HashSet<>(15);
+        props.add("checkpointInterval");
+        props.add("checkpointTimeout");
+        props.add("checkpointingMode");
         props.add("collect");
         props.add("dataSet");
         props.add("dataSetCallback");
         props.add("dataStream");
         props.add("dataStreamCallback");
         props.add("endpointType");
+        props.add("executionMode");
+        props.add("jobName");
         props.add("lazyStartProducer");
+        props.add("maxParallelism");
+        props.add("minPauseBetweenCheckpoints");
+        props.add("parallelism");
         PROPERTY_NAMES = Collections.unmodifiableSet(props);
         SECRET_PROPERTY_NAMES = Collections.emptySet();
         MULTI_VALUE_PREFIXES = Collections.emptyMap();
diff --git 
a/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json
 
b/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json
index 85f0a4f96399..a19a18dabbb5 100644
--- 
a/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json
+++ 
b/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json
@@ -24,7 +24,7 @@
     "remote": true
   },
   "componentProperties": {
-    "dataSetCallback": { "index": 0, "kind": "property", "displayName": "Data 
Set Callback", "group": "producer", "label": "", "required": false, "type": 
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Function performing action against a DataSet." },
+    "dataSetCallback": { "index": 0, "kind": "property", "displayName": "Data 
Set Callback", "group": "producer", "label": "", "required": false, "type": 
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback", 
"deprecated": true, "autowired": false, "secret": false, "description": 
"Function performing action against a DataSet." },
     "dataStream": { "index": 1, "kind": "property", "displayName": "Data 
Stream", "group": "producer", "label": "", "required": false, "type": "object", 
"javaType": "org.apache.flink.streaming.api.datastream.DataStream", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"DataStream to compute against." },
     "dataStreamCallback": { "index": 2, "kind": "property", "displayName": 
"Data Stream Callback", "group": "producer", "label": "", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.flink.DataStreamCallback", "deprecated": false, 
"autowired": false, "secret": false, "description": "Function performing action 
against a DataStream." },
     "lazyStartProducer": { "index": 3, "kind": "property", "displayName": 
"Lazy Start Producer", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether the producer should be started lazy (on the first message). By 
starting lazy you can use this to allow CamelContext and routes to startup in 
situations where a producer may otherwise fail [...]
@@ -43,6 +43,14 @@
     "dataSetCallback": { "index": 3, "kind": "parameter", "displayName": "Data 
Set Callback", "group": "producer", "label": "", "required": false, "type": 
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Function performing action against a DataSet." },
     "dataStream": { "index": 4, "kind": "parameter", "displayName": "Data 
Stream", "group": "producer", "label": "", "required": false, "type": "object", 
"javaType": "org.apache.flink.streaming.api.datastream.DataStream", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"DataStream to compute against." },
     "dataStreamCallback": { "index": 5, "kind": "parameter", "displayName": 
"Data Stream Callback", "group": "producer", "label": "", "required": false, 
"type": "object", "javaType": 
"org.apache.camel.component.flink.DataStreamCallback", "deprecated": false, 
"autowired": false, "secret": false, "description": "Function performing action 
against a DataStream." },
-    "lazyStartProducer": { "index": 6, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produc [...]
+    "checkpointingMode": { "index": 6, "kind": "parameter", "displayName": 
"Checkpointing Mode", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "enum", "javaType": 
"java.lang.String", "enum": [ "EXACTLY_ONCE", "AT_LEAST_ONCE" ], "deprecated": 
false, "autowired": false, "secret": false, "description": "Checkpointing mode: 
EXACTLY_ONCE (default) or AT_LEAST_ONCE. EXACTLY_ONCE provides stronger 
guarantees but may have higher overhead." },
+    "checkpointInterval": { "index": 7, "kind": "parameter", "displayName": 
"Checkpoint Interval", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "integer", "javaType": 
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false, 
"description": "Interval in milliseconds between checkpoints. Enables 
checkpointing when set. Recommended for streaming jobs to ensure fault 
tolerance." },
+    "checkpointTimeout": { "index": 8, "kind": "parameter", "displayName": 
"Checkpoint Timeout", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "integer", "javaType": 
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false, 
"description": "Timeout in milliseconds for checkpoints. Checkpoints that take 
longer will be aborted." },
+    "executionMode": { "index": 9, "kind": "parameter", "displayName": 
"Execution Mode", "group": "producer (advanced)", "label": "producer,advanced", 
"required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ 
"STREAMING", "BATCH", "AUTOMATIC" ], "deprecated": false, "autowired": false, 
"secret": false, "description": "Execution mode for the Flink job. Options: 
STREAMING (default), BATCH, AUTOMATIC. BATCH mode is recommended for bounded 
streams (batch processing)." },
+    "jobName": { "index": 10, "kind": "parameter", "displayName": "Job Name", 
"group": "producer (advanced)", "label": "producer,advanced", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Name for the Flink job. 
Useful for identification in the Flink UI and logs." },
+    "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
+    "maxParallelism": { "index": 12, "kind": "parameter", "displayName": "Max 
Parallelism", "group": "producer (advanced)", "label": "producer,advanced", 
"required": false, "type": "integer", "javaType": "java.lang.Integer", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Maximum parallelism for the Flink job. Defines the upper bound for dynamic 
scaling and the number of key groups for stateful operators." },
+    "minPauseBetweenCheckpoints": { "index": 13, "kind": "parameter", 
"displayName": "Min Pause Between Checkpoints", "group": "producer (advanced)", 
"label": "producer,advanced", "required": false, "type": "integer", "javaType": 
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false, 
"description": "Minimum pause in milliseconds between consecutive checkpoints. 
Helps prevent checkpoint storms under heavy load." },
+    "parallelism": { "index": 14, "kind": "parameter", "displayName": 
"Parallelism", "group": "producer (advanced)", "label": "producer,advanced", 
"required": false, "type": "integer", "javaType": "java.lang.Integer", 
"deprecated": false, "autowired": false, "secret": false, "description": 
"Parallelism for the Flink job. If not set, uses the default parallelism of the 
execution environment." }
   }
 }
diff --git a/components/camel-flink/src/main/docs/flink-component.adoc 
b/components/camel-flink/src/main/docs/flink-component.adoc
index d4172f7e4133..8e86f995c957 100644
--- a/components/camel-flink/src/main/docs/flink-component.adoc
+++ b/components/camel-flink/src/main/docs/flink-component.adoc
@@ -44,6 +44,10 @@ 
flink:dataset?dataset=#myDataSet&dataSetCallback=#dataSetCallback
 
flink:datastream?datastream=#myDataStream&dataStreamCallback=#dataStreamCallback
 -------------------------------------------------
 
+IMPORTANT: The DataSet API has been deprecated by Apache Flink since version 
1.12.
+It is recommended to migrate to the DataStream API with bounded streams for 
batch processing.
+See the Migration Guide section below for more details.
+
 
 // component-configure options: START
 
@@ -116,5 +120,211 @@ try {
     }
 -----------------------------------
 
+=== Modern DataStream Batch Processing Example
+
+The recommended approach using the DataStream API in batch mode:
+
+[source,java]
+-----------------------------------
+@Bean
+public StreamExecutionEnvironment streamExecutionEnvironment() {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    // Configure for batch processing
+    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+    return env;
+}
+
+@Bean
+public DataStreamSource<String> myDataStream(StreamExecutionEnvironment env) {
+    return env.readTextFile("src/test/resources/testds.txt");
+}
+
+@Bean
+public DataStreamCallback wordCountCallback() {
+    return new VoidDataStreamCallback() {
+        @Override
+        public void doOnDataStream(DataStream dataStream, Object... payloads) 
throws Exception {
+            dataStream
+                .flatMap((String line, Collector<Tuple2<String, Integer>> out) 
-> {
+                    for (String word : line.split("\\s+")) {
+                        out.collect(Tuple2.of(word, 1));
+                    }
+                })
+                .returns(Types.TUPLE(Types.STRING, Types.INT))
+                .keyBy(tuple -> tuple.f0)
+                .sum(1)
+                .print();
+        }
+    };
+}
+
+// In your route
+from("direct:wordCount")
+    
.to("flink:datastream?dataStream=#myDataStream&dataStreamCallback=#wordCountCallback");
+-----------------------------------
+
+=== Real-time Streaming Example
+
+For true streaming use cases with unbounded data:
+
+[source,java]
+-----------------------------------
+@Bean
+public StreamExecutionEnvironment streamingEnvironment() {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    // Configure for streaming (default mode)
+    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+    // Enable checkpointing for fault tolerance
+    env.enableCheckpointing(10000); // checkpoint every 10 seconds
+    return env;
+}
+
+@Bean
+public DataStreamCallback streamingProcessCallback() {
+    return new VoidDataStreamCallback() {
+        @Override
+        public void doOnDataStream(DataStream dataStream, Object... payloads) 
throws Exception {
+            dataStream
+                .map(event -> processEvent(event))
+                .keyBy(event -> event.getKey())
+                .window(TumblingEventTimeWindows.of(Time.minutes(5)))
+                .aggregate(new MyAggregateFunction())
+                .addSink(new MyCustomSink());
+
+            // Execute the streaming job
+            dataStream.getExecutionEnvironment().execute("Streaming Job");
+        }
+    };
+}
+-----------------------------------
+
+=== Advanced Configuration Examples
+
+==== Batch Processing with Configuration
+
+Configure a DataStream endpoint for batch processing with custom settings:
+
+[source,java]
+-----------------------------------
+from("direct:batchProcess")
+    .to("flink:datastream?dataStream=#myDataStream"
+        + "&dataStreamCallback=#myCallback"
+        + "&executionMode=BATCH"
+        + "&parallelism=4"
+        + "&jobName=MyBatchJob");
+-----------------------------------
+
+==== Streaming with Checkpointing
+
+Configure a streaming job with checkpointing for fault tolerance:
+
+[source,java]
+-----------------------------------
+from("direct:streamProcess")
+    .to("flink:datastream?dataStream=#myDataStream"
+        + "&dataStreamCallback=#streamCallback"
+        + "&executionMode=STREAMING"
+        + "&checkpointInterval=60000"              // Checkpoint every 60 
seconds
+        + "&checkpointingMode=EXACTLY_ONCE"        // Exactly-once semantics
+        + "&checkpointTimeout=120000"              // 2 minute timeout
+        + "&minPauseBetweenCheckpoints=30000"      // 30 second minimum pause
+        + "&parallelism=8"
+        + "&maxParallelism=128"
+        + "&jobName=StreamingPipeline");
+-----------------------------------
+
+==== Configuration Options Reference
+
+[cols="1,1,1,3", options="header"]
+|===
+|Parameter |Type |Default |Description
+
+|executionMode
+|String
+|STREAMING
+|Runtime execution mode: STREAMING, BATCH, or AUTOMATIC. BATCH is recommended 
for bounded streams.
+
+|checkpointInterval
+|Long
+|null
+|Checkpoint interval in milliseconds. Setting this enables checkpointing.
+
+|checkpointingMode
+|String
+|EXACTLY_ONCE
+|Checkpointing mode: EXACTLY_ONCE or AT_LEAST_ONCE.
+
+|checkpointTimeout
+|Long
+|10 minutes
+|Maximum time in milliseconds that a checkpoint may take.
+
+|minPauseBetweenCheckpoints
+|Long
+|0
+|Minimum time in milliseconds between consecutive checkpoints.
+
+|parallelism
+|Integer
+|Default parallelism
+|Parallelism for the Flink job operations.
+
+|maxParallelism
+|Integer
+|128
+|Maximum parallelism, which defines the upper bound for dynamic scaling.
+
+|jobName
+|String
+|null
+|Name for the Flink job, useful for identification in Flink UI.
+
+|collect
+|Boolean
+|true
+|Whether to collect results (not applicable for unbounded streams).
+|===
+
+
+=== Common Patterns
+
+==== Counting Elements
+
+.Before (DataSet)
+[source,java]
+-----------------------------------
+long count = dataSet.count();
+-----------------------------------
+
+.After (DataStream)
+[source,java]
+-----------------------------------
+// Use a custom sink or reduce operation
+dataStream
+    .map(e -> 1L)
+    .reduce(Long::sum)
+    .print();
+-----------------------------------
+
+==== Collecting Results
+
+.Before (DataSet)
+[source,java]
+-----------------------------------
+List<String> results = dataSet.collect();
+-----------------------------------
+
+.After (DataStream)
+[source,java]
+-----------------------------------
+// In batch mode, use executeAndCollect() or a sink
+List<String> results = dataStream.executeAndCollect(1000);
+-----------------------------------
+
+=== Additional Resources
+
+* 
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/dataset/overview/[Apache
 Flink DataSet API Migration Guide]
+* 
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/overview/[DataStream
 API Documentation]
+* 
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/execution_mode/[Batch
 Execution Mode]
 
 include::spring-boot:partial$starter.adoc[]
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java
index f91bd9a366c4..9ba05cb3b0cc 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java
@@ -21,6 +21,15 @@ import org.apache.flink.api.java.DataSet;
 
 import static java.lang.String.format;
 
+/**
+ * DataSet callback with automatic type conversion for payloads.
+ *
+ * @param      <T> results type
+ * @deprecated     The DataSet API is deprecated since Flink 1.12. Use the 
DataStream API with bounded streams instead.
+ *                 See the Flink migration guide for details on migrating from 
DataSet to DataStream API. This class
+ *                 will be maintained for backward compatibility but may be 
removed in future versions.
+ */
+@Deprecated(since = "4.16.0")
 public abstract class ConvertingDataSetCallback<T> implements 
DataSetCallback<T> {
 
     private final CamelContext camelContext;
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java
index 3979bfe9208b..93a253250c1d 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java
@@ -21,8 +21,12 @@ import org.apache.flink.api.java.DataSet;
 /**
  * Generic block of code with parameters which can be executed against DataSet 
and return results.
  *
- * @param <T> results type
+ * @param      <T> results type
+ * @deprecated     The DataSet API is deprecated since Flink 1.12. Use the 
DataStream API with bounded streams instead.
+ *                 See the Flink migration guide for details on migrating from 
DataSet to DataStream API. This class
+ *                 will be maintained for backward compatibility but may be 
removed in future versions.
  */
+@Deprecated(since = "4.16.0")
 public interface DataSetCallback<T> {
 
     T onDataSet(DataSet ds, Object... payloads);
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java
index 75cb4798d890..dc038b76a8dc 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java
@@ -22,6 +22,13 @@ import org.apache.camel.Exchange;
 import org.apache.camel.support.DefaultProducer;
 import org.apache.flink.api.java.DataSet;
 
+/**
+ * Producer for executing Flink DataSet operations.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the 
DataStream API with bounded streams instead. See
+ *             the Flink migration guide for details on migrating from DataSet 
to DataStream API.
+ */
+@Deprecated(since = "4.16.0")
 public class DataSetFlinkProducer extends DefaultProducer {
 
     public DataSetFlinkProducer(FlinkEndpoint endpoint) {
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
index 53d8bf5bac7d..25a359694713 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
@@ -20,10 +20,23 @@ import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.support.DefaultProducer;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Producer for executing Flink DataStream operations with support for modern 
Flink features including execution mode
+ * configuration, checkpointing, and parallelism settings.
+ */
 public class DataStreamFlinkProducer extends DefaultProducer {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataStreamFlinkProducer.class);
+
+    private volatile boolean environmentConfigured = false;
+
     public DataStreamFlinkProducer(FlinkEndpoint endpoint) {
         super(endpoint);
     }
@@ -31,6 +44,17 @@ public class DataStreamFlinkProducer extends DefaultProducer 
{
     @Override
     public void process(Exchange exchange) throws Exception {
         DataStream ds = resolveDataStream(exchange);
+
+        // Configure environment on first use when DataStream is available
+        if (!environmentConfigured && ds != null) {
+            synchronized (this) {
+                if (!environmentConfigured) {
+                    configureStreamExecutionEnvironment(ds);
+                    environmentConfigured = true;
+                }
+            }
+        }
+
         DataStreamCallback dataStreamCallback = 
resolveDataStreamCallback(exchange);
         Object body = exchange.getIn().getBody();
         Object result = body instanceof List
@@ -76,4 +100,77 @@ public class DataStreamFlinkProducer extends 
DefaultProducer {
             throw new IllegalArgumentException("Cannot resolve DataStream 
callback.");
         }
     }
+
+    /**
+     * Configures the StreamExecutionEnvironment with the settings from the 
endpoint. This includes execution mode,
+     * checkpointing, parallelism, and other advanced options.
+     *
+     * @param dataStream the DataStream to configure the environment for
+     */
+    protected void configureStreamExecutionEnvironment(DataStream dataStream) {
+        if (dataStream == null) {
+            LOG.debug("No DataStream provided, skipping environment 
configuration");
+            return;
+        }
+
+        StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
+
+        // Configure execution mode (BATCH, STREAMING, AUTOMATIC)
+        if (getEndpoint().getExecutionMode() != null) {
+            try {
+                RuntimeExecutionMode mode = 
RuntimeExecutionMode.valueOf(getEndpoint().getExecutionMode());
+                env.setRuntimeMode(mode);
+                LOG.info("Set Flink runtime execution mode to: {}", mode);
+            } catch (IllegalArgumentException e) {
+                LOG.warn("Invalid execution mode '{}'. Valid values are: 
STREAMING, BATCH, AUTOMATIC",
+                        getEndpoint().getExecutionMode());
+            }
+        }
+
+        // Configure parallelism
+        if (getEndpoint().getParallelism() != null) {
+            env.setParallelism(getEndpoint().getParallelism());
+            LOG.info("Set Flink parallelism to: {}", 
getEndpoint().getParallelism());
+        }
+
+        // Configure max parallelism
+        if (getEndpoint().getMaxParallelism() != null) {
+            env.setMaxParallelism(getEndpoint().getMaxParallelism());
+            LOG.info("Set Flink max parallelism to: {}", 
getEndpoint().getMaxParallelism());
+        }
+
+        // Configure checkpointing
+        if (getEndpoint().getCheckpointInterval() != null && 
getEndpoint().getCheckpointInterval() > 0) {
+            env.enableCheckpointing(getEndpoint().getCheckpointInterval());
+            LOG.info("Enabled checkpointing with interval: {} ms", 
getEndpoint().getCheckpointInterval());
+
+            // Configure checkpointing mode
+            if (getEndpoint().getCheckpointingMode() != null) {
+                try {
+                    CheckpointingMode mode = 
CheckpointingMode.valueOf(getEndpoint().getCheckpointingMode());
+                    env.getCheckpointConfig().setCheckpointingMode(mode);
+                    LOG.info("Set checkpointing mode to: {}", mode);
+                } catch (IllegalArgumentException e) {
+                    LOG.warn("Invalid checkpointing mode '{}'. Valid values 
are: EXACTLY_ONCE, AT_LEAST_ONCE",
+                            getEndpoint().getCheckpointingMode());
+                }
+            }
+
+            // Configure checkpoint timeout
+            if (getEndpoint().getCheckpointTimeout() != null) {
+                
env.getCheckpointConfig().setCheckpointTimeout(getEndpoint().getCheckpointTimeout());
+                LOG.info("Set checkpoint timeout to: {} ms", 
getEndpoint().getCheckpointTimeout());
+            }
+
+            // Configure min pause between checkpoints
+            if (getEndpoint().getMinPauseBetweenCheckpoints() != null) {
+                env.getCheckpointConfig()
+                        
.setMinPauseBetweenCheckpoints(getEndpoint().getMinPauseBetweenCheckpoints());
+                LOG.info("Set min pause between checkpoints to: {} ms",
+                        getEndpoint().getMinPauseBetweenCheckpoints());
+            }
+        }
+
+        LOG.debug("StreamExecutionEnvironment configuration completed");
+    }
 }
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
index f0f950153ad3..7c1ee50a48a2 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
@@ -58,7 +58,10 @@ public class FlinkComponent extends DefaultComponent {
 
     /**
      * DataSet to compute against.
+     *
+     * @deprecated The DataSet API is deprecated since Flink 1.12. Use the 
DataStream API with bounded streams instead.
      */
+    @Deprecated(since = "4.16.0")
     public void setDataSet(DataSet ds) {
         this.ds = ds;
     }
@@ -80,7 +83,10 @@ public class FlinkComponent extends DefaultComponent {
 
     /**
      * Function performing action against a DataSet.
+     *
+     * @deprecated The DataSet API is deprecated since Flink 1.12. Use the 
DataStream API with bounded streams instead.
      */
+    @Deprecated(since = "4.16.0")
     public void setDataSetCallback(DataSetCallback dataSetCallback) {
         this.dataSetCallback = dataSetCallback;
     }
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
index 5d2eea15ae8a..8bffd74cf250 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
@@ -48,6 +48,22 @@ public class FlinkEndpoint extends DefaultEndpoint {
     private DataStreamCallback dataStreamCallback;
     @UriParam(defaultValue = "true")
     private boolean collect = true;
+    @UriParam(label = "producer,advanced", enums = "STREAMING,BATCH,AUTOMATIC")
+    private String executionMode;
+    @UriParam(label = "producer,advanced")
+    private Long checkpointInterval;
+    @UriParam(label = "producer,advanced", enums = 
"EXACTLY_ONCE,AT_LEAST_ONCE")
+    private String checkpointingMode;
+    @UriParam(label = "producer,advanced")
+    private Integer parallelism;
+    @UriParam(label = "producer,advanced")
+    private Integer maxParallelism;
+    @UriParam(label = "producer,advanced")
+    private String jobName;
+    @UriParam(label = "producer,advanced")
+    private Long checkpointTimeout;
+    @UriParam(label = "producer,advanced")
+    private Long minPauseBetweenCheckpoints;
 
     public FlinkEndpoint(String endpointUri, FlinkComponent component, 
EndpointType endpointType) {
         super(endpointUri, component);
@@ -105,7 +121,10 @@ public class FlinkEndpoint extends DefaultEndpoint {
 
     /**
      * DataSet to compute against.
+     *
+     * @deprecated The DataSet API is deprecated since Flink 1.12. Use the 
DataStream API with bounded streams instead.
      */
+    @Deprecated(since = "4.16.0")
     public void setDataSet(DataSet ds) {
         this.dataSet = ds;
     }
@@ -127,7 +146,10 @@ public class FlinkEndpoint extends DefaultEndpoint {
 
     /**
      * Function performing action against a DataSet.
+     *
+     * @deprecated The DataSet API is deprecated since Flink 1.12. Use the 
DataStream API with bounded streams instead.
      */
+    @Deprecated(since = "4.16.0")
     public void setDataSetCallback(DataSetCallback dataSetCallback) {
         this.dataSetCallback = dataSetCallback;
     }
@@ -149,4 +171,96 @@ public class FlinkEndpoint extends DefaultEndpoint {
     public void setCollect(boolean collect) {
         this.collect = collect;
     }
+
+    public String getExecutionMode() {
+        return executionMode;
+    }
+
+    /**
+     * Execution mode for the Flink job. Options: STREAMING (default), BATCH, 
AUTOMATIC. BATCH mode is recommended for
+     * bounded streams (batch processing).
+     */
+    public void setExecutionMode(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    public Long getCheckpointInterval() {
+        return checkpointInterval;
+    }
+
+    /**
+     * Interval in milliseconds between checkpoints. Enables checkpointing 
when set. Recommended for streaming jobs to
+     * ensure fault tolerance.
+     */
+    public void setCheckpointInterval(Long checkpointInterval) {
+        this.checkpointInterval = checkpointInterval;
+    }
+
+    public String getCheckpointingMode() {
+        return checkpointingMode;
+    }
+
+    /**
+     * Checkpointing mode: EXACTLY_ONCE (default) or AT_LEAST_ONCE. 
EXACTLY_ONCE provides stronger guarantees but may
+     * have higher overhead.
+     */
+    public void setCheckpointingMode(String checkpointingMode) {
+        this.checkpointingMode = checkpointingMode;
+    }
+
+    public Integer getParallelism() {
+        return parallelism;
+    }
+
+    /**
+     * Parallelism for the Flink job. If not set, uses the default parallelism 
of the execution environment.
+     */
+    public void setParallelism(Integer parallelism) {
+        this.parallelism = parallelism;
+    }
+
+    public Integer getMaxParallelism() {
+        return maxParallelism;
+    }
+
+    /**
+     * Maximum parallelism for the Flink job. Defines the upper bound for 
dynamic scaling and the number of key groups
+     * for stateful operators.
+     */
+    public void setMaxParallelism(Integer maxParallelism) {
+        this.maxParallelism = maxParallelism;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    /**
+     * Name for the Flink job. Useful for identification in the Flink UI and 
logs.
+     */
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public Long getCheckpointTimeout() {
+        return checkpointTimeout;
+    }
+
+    /**
+     * Timeout in milliseconds for checkpoints. Checkpoints that take longer 
will be aborted.
+     */
+    public void setCheckpointTimeout(Long checkpointTimeout) {
+        this.checkpointTimeout = checkpointTimeout;
+    }
+
+    public Long getMinPauseBetweenCheckpoints() {
+        return minPauseBetweenCheckpoints;
+    }
+
+    /**
+     * Minimum pause in milliseconds between consecutive checkpoints. Helps 
prevent checkpoint storms under heavy load.
+     */
+    public void setMinPauseBetweenCheckpoints(Long minPauseBetweenCheckpoints) 
{
+        this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+    }
 }
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java
index 250ea1b3d95e..0230da4a92ae 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java
@@ -18,6 +18,14 @@ package org.apache.camel.component.flink;
 
 import org.apache.flink.api.java.DataSet;
 
+/**
+ * Void implementation of DataSetCallback for operations that don't return 
results.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the 
DataStream API with bounded streams instead. See
+ *             the Flink migration guide for details on migrating from DataSet 
to DataStream API. This class will be
+ *             maintained for backward compatibility but may be removed in 
future versions.
+ */
+@Deprecated(since = "4.16.0")
 public abstract class VoidDataSetCallback implements DataSetCallback<Void> {
 
     public abstract void doOnDataSet(DataSet ds, Object... payloads);
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java
index bcd1cc1e6dc0..885427ff5a78 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java
@@ -32,7 +32,12 @@ import static 
org.apache.camel.util.ObjectHelper.findMethodsWithAnnotation;
 /**
  * Provides facade for working with annotated DataSet callbacks i.e. POJO 
classes with an appropriate annotations on
  * selected methods.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the 
DataStream API with bounded streams instead. See
+ *             the Flink migration guide for details on migrating from DataSet 
to DataStream API. This class will be
+ *             maintained for backward compatibility but may be removed in 
future versions.
  */
+@Deprecated(since = "4.16.0")
 public class AnnotatedDataSetCallback implements 
org.apache.camel.component.flink.DataSetCallback {
 
     private final Object objectWithCallback;
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java
index 2f02ce20e6c6..ad177103fe16 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java
@@ -22,6 +22,14 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
+/**
+ * Annotation for marking methods as DataSet callbacks.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the 
DataStream API with bounded streams instead. See
+ *             the Flink migration guide for details on migrating from DataSet 
to DataStream API. This annotation will
+ *             be maintained for backward compatibility but may be removed in 
future versions.
+ */
+@Deprecated(since = "4.16.0")
 @Retention(RetentionPolicy.RUNTIME)
 @Target({ ElementType.METHOD, ElementType.PARAMETER })
 @Inherited
diff --git 
a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamBatchProcessingIT.java
 
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamBatchProcessingIT.java
new file mode 100644
index 000000000000..28c40445dec2
--- /dev/null
+++ 
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamBatchProcessingIT.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.flink;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for DataStream batch processing with various 
configurations. Tests end-to-end scenarios with actual
+ * data transformations.
+ */
+public class DataStreamBatchProcessingIT extends CamelTestSupport {
+
+    // Create separate environments for isolation
+    StreamExecutionEnvironment batchEnv = 
Flinks.createStreamExecutionEnvironment();
+    StreamExecutionEnvironment transformEnv = 
Flinks.createStreamExecutionEnvironment();
+
+    @BindToRegistry("numberStream")
+    private DataStreamSource<Integer> numberStream = batchEnv.fromElements(1, 
2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+    @BindToRegistry("textStream")
+    private DataStreamSource<String> textStream
+            = transformEnv.fromElements("apache", "camel", "flink", 
"integration", "test");
+
+    @BindToRegistry("multiplyCallback")
+    public DataStreamCallback multiplyCallback() {
+        return new VoidDataStreamCallback() {
+            @Override
+            public void doOnDataStream(DataStream ds, Object... payloads) 
throws Exception {
+                int multiplier = (Integer) payloads[0];
+                ds.map((MapFunction<Integer, Integer>) value -> value * 
multiplier)
+                        .print();
+            }
+        };
+    }
+
+    @Test
+    public void testBatchProcessingWithTransformation() {
+        // Verify that the callback executes without error and the 
transformation is set up
+        template.sendBodyAndHeader(
+                "direct:batchTransform",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // Verify environment is configured with batch mode 
and parallelism=2
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        
Assertions.assertThat(env.getParallelism()).isEqualTo(2);
+
+                        // Set up transformation (won't execute in test 
context)
+                        ds.map((MapFunction<Integer, Integer>) value -> value 
* 2).print();
+                    }
+                });
+    }
+
+    @Test
+    public void testBatchProcessingWithPayload() {
+        List<Integer> results = new ArrayList<>();
+
+        template.sendBodyAndHeader(
+                "direct:withPayload",
+                3, // multiplier
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        Assertions.assertThat(payloads).hasSize(1);
+                        int multiplier = (Integer) payloads[0];
+                        Assertions.assertThat(multiplier).isEqualTo(3);
+
+                        ds.map((MapFunction<Integer, Integer>) value -> value 
* multiplier)
+                                .print();
+                    }
+                });
+    }
+
+    @Test
+    public void testBatchProcessingWithFilter() {
+        // Verify filter operation can be set up
+        template.sendBodyAndHeader(
+                "direct:batchFilter",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // Verify environment configuration
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        
Assertions.assertThat(env.getParallelism()).isEqualTo(1);
+
+                        // Set up filter (won't execute in test context)
+                        ds.filter(value -> ((Integer) value) % 2 == 0).print();
+                    }
+                });
+    }
+
+    @Test
+    public void testStringProcessingWithBatchMode() {
+        // Verify string transformation can be set up
+        template.sendBodyAndHeader(
+                "direct:stringTransform",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // Verify environment configuration
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        
Assertions.assertThat(env.getParallelism()).isEqualTo(3);
+
+                        // Set up transformation
+                        ds.map((MapFunction<String, String>) 
String::toUpperCase).print();
+                    }
+                });
+    }
+
+    @Test
+    public void testHighParallelismProcessing() {
+        // Verify high parallelism configuration
+        template.sendBodyAndHeader(
+                "direct:highParallelism",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+
+                        // Verify high parallelism was set
+                        
Assertions.assertThat(env.getParallelism()).isEqualTo(16);
+                        
Assertions.assertThat(env.getMaxParallelism()).isEqualTo(256);
+
+                        // Set up transformation
+                        ds.map((MapFunction<Integer, Integer>) value -> value 
* value).print();
+                    }
+                });
+    }
+
+    @Test
+    public void testCallbackFromRegistry() {
+        // Track that the callback was actually invoked
+        final boolean[] callbackInvoked = { false };
+
+        // Send body with multiplier and verify callback executes
+        template.sendBodyAndHeader(
+                "direct:registryCallback",
+                5,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // Verify the callback is invoked
+                        callbackInvoked[0] = true;
+
+                        // Verify environment configuration
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        
Assertions.assertThat(env.getParallelism()).isEqualTo(4);
+
+                        // Verify payload was passed correctly
+                        Assertions.assertThat(payloads).hasSize(1);
+                        Assertions.assertThat(payloads[0]).isEqualTo(5);
+
+                        // Set up the transformation (using the registry 
callback pattern)
+                        ds.map((MapFunction<Integer, Integer>) value -> value 
* (Integer) payloads[0]).print();
+                    }
+                });
+
+        // Verify callback was executed
+        Assertions.assertThat(callbackInvoked[0]).isTrue();
+    }
+
+    @Test
+    public void testMultipleOperations() {
+        // Verify chained operations can be set up
+        template.sendBodyAndHeader(
+                "direct:multipleOps",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // Verify environment configuration
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        
Assertions.assertThat(env.getParallelism()).isEqualTo(4);
+
+                        // Set up chained operations
+                        ds.filter(value -> ((Integer) value) > 3)
+                                .map((MapFunction<Integer, Integer>) value -> 
value * 10)
+                                .map((MapFunction<Integer, Integer>) value -> 
value + 5)
+                                .print();
+                    }
+                });
+    }
+
+    @Test
+    public void testConfigurationPersistsAcrossInvocations() {
+        // First invocation
+        template.sendBodyAndHeader(
+                "direct:batchTransform",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        
Assertions.assertThat(env.getParallelism()).isEqualTo(2);
+                    }
+                });
+
+        // Second invocation - should have same configuration
+        template.sendBodyAndHeader(
+                "direct:batchTransform",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        
Assertions.assertThat(env.getParallelism()).isEqualTo(2);
+                    }
+                });
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:batchTransform")
+                        .to("flink:datastream?dataStream=#numberStream"
+                            + "&executionMode=BATCH"
+                            + "&parallelism=2");
+
+                from("direct:withPayload")
+                        .to("flink:datastream?dataStream=#numberStream"
+                            + "&executionMode=BATCH");
+
+                from("direct:batchFilter")
+                        .to("flink:datastream?dataStream=#numberStream"
+                            + "&executionMode=BATCH"
+                            + "&parallelism=1");
+
+                from("direct:stringTransform")
+                        .to("flink:datastream?dataStream=#textStream"
+                            + "&executionMode=BATCH"
+                            + "&parallelism=3");
+
+                from("direct:highParallelism")
+                        .to("flink:datastream?dataStream=#numberStream"
+                            + "&executionMode=BATCH"
+                            + "&parallelism=16"
+                            + "&maxParallelism=256");
+
+                from("direct:registryCallback")
+                        .to("flink:datastream?dataStream=#numberStream"
+                            + "&dataStreamCallback=#multiplyCallback"
+                            + "&executionMode=BATCH"
+                            + "&parallelism=4");
+
+                from("direct:multipleOps")
+                        .to("flink:datastream?dataStream=#numberStream"
+                            + "&executionMode=BATCH"
+                            + "&parallelism=4");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamConfigurationIT.java
 
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamConfigurationIT.java
new file mode 100644
index 000000000000..cb25c080f640
--- /dev/null
+++ 
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamConfigurationIT.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.flink;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for DataStream producer configuration options including 
execution mode, checkpointing, and
+ * parallelism settings.
+ */
+public class DataStreamConfigurationIT extends CamelTestSupport {
+
+    @BindToRegistry("batchDataStream")
+    public DataStreamSource<String> createBatchDataStream() {
+        return Flinks.createStreamExecutionEnvironment().fromElements("test1", 
"test2", "test3");
+    }
+
+    @BindToRegistry("streamingDataStream")
+    public DataStreamSource<String> createStreamingDataStream() {
+        return 
Flinks.createStreamExecutionEnvironment().fromElements("stream1", "stream2");
+    }
+
+    @BindToRegistry("checkpointDataStream")
+    public DataStreamSource<String> createCheckpointDataStream() {
+        return Flinks.createStreamExecutionEnvironment().fromElements("data1", 
"data2");
+    }
+
+    @BindToRegistry("parallelismDataStream")
+    public DataStreamSource<String> createParallelismDataStream() {
+        return 
Flinks.createStreamExecutionEnvironment().fromElements("parallel1", 
"parallel2");
+    }
+
+    @BindToRegistry("fullConfigDataStream")
+    public DataStreamSource<String> createFullConfigDataStream() {
+        return 
Flinks.createStreamExecutionEnvironment().fromElements("config1", "config2");
+    }
+
+    @BindToRegistry("captureEnvCallback")
+    public DataStreamCallback captureEnvCallback() {
+        return new VoidDataStreamCallback() {
+            @Override
+            public void doOnDataStream(DataStream ds, Object... payloads) 
throws Exception {
+                // Just capture the environment for testing
+                ds.print();
+            }
+        };
+    }
+
+    @Test
+    public void testBatchExecutionModeConfiguration() {
+        AtomicReference<StreamExecutionEnvironment> envRef = new 
AtomicReference<>();
+
+        template.sendBodyAndHeader(
+                "direct:batchMode",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        envRef.set(env);
+                    }
+                });
+
+        StreamExecutionEnvironment env = envRef.get();
+        Assertions.assertThat(env).isNotNull();
+
+        // Verify BATCH mode was set
+        RuntimeExecutionMode mode = env.getConfiguration()
+                
.get(org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE);
+        Assertions.assertThat(mode).isEqualTo(RuntimeExecutionMode.BATCH);
+    }
+
+    @Test
+    public void testStreamingExecutionModeConfiguration() {
+        AtomicReference<StreamExecutionEnvironment> envRef = new 
AtomicReference<>();
+
+        template.sendBodyAndHeader(
+                "direct:streamingMode",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        envRef.set(env);
+                    }
+                });
+
+        StreamExecutionEnvironment env = envRef.get();
+        Assertions.assertThat(env).isNotNull();
+
+        // Verify STREAMING mode was set
+        RuntimeExecutionMode mode = env.getConfiguration()
+                
.get(org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE);
+        Assertions.assertThat(mode).isEqualTo(RuntimeExecutionMode.STREAMING);
+    }
+
+    @Test
+    public void testCheckpointingConfiguration() {
+        AtomicReference<CheckpointConfig> checkpointConfigRef = new 
AtomicReference<>();
+
+        template.sendBodyAndHeader(
+                "direct:checkpointing",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        checkpointConfigRef.set(env.getCheckpointConfig());
+                    }
+                });
+
+        CheckpointConfig config = checkpointConfigRef.get();
+        Assertions.assertThat(config).isNotNull();
+
+        // Verify checkpoint interval
+        Assertions.assertThat(config.getCheckpointInterval()).isEqualTo(5000L);
+
+        // Verify checkpointing mode
+        
Assertions.assertThat(config.getCheckpointingMode()).isEqualTo(CheckpointingMode.EXACTLY_ONCE);
+
+        // Verify checkpoint timeout
+        Assertions.assertThat(config.getCheckpointTimeout()).isEqualTo(30000L);
+
+        // Verify min pause between checkpoints
+        
Assertions.assertThat(config.getMinPauseBetweenCheckpoints()).isEqualTo(2000L);
+    }
+
+    @Test
+    public void testParallelismConfiguration() {
+        AtomicReference<StreamExecutionEnvironment> envRef = new 
AtomicReference<>();
+
+        template.sendBodyAndHeader(
+                "direct:parallelism",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        envRef.set(env);
+                    }
+                });
+
+        StreamExecutionEnvironment env = envRef.get();
+        Assertions.assertThat(env).isNotNull();
+
+        // Verify parallelism settings
+        Assertions.assertThat(env.getParallelism()).isEqualTo(4);
+        Assertions.assertThat(env.getMaxParallelism()).isEqualTo(64);
+    }
+
+    @Test
+    public void testFullConfiguration() {
+        AtomicReference<StreamExecutionEnvironment> envRef = new 
AtomicReference<>();
+        AtomicReference<CheckpointConfig> checkpointConfigRef = new 
AtomicReference<>();
+
+        template.sendBodyAndHeader(
+                "direct:fullConfig",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        envRef.set(env);
+                        checkpointConfigRef.set(env.getCheckpointConfig());
+                    }
+                });
+
+        StreamExecutionEnvironment env = envRef.get();
+        CheckpointConfig checkpointConfig = checkpointConfigRef.get();
+
+        Assertions.assertThat(env).isNotNull();
+        Assertions.assertThat(checkpointConfig).isNotNull();
+
+        // Verify execution mode
+        RuntimeExecutionMode mode = env.getConfiguration()
+                
.get(org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE);
+        Assertions.assertThat(mode).isEqualTo(RuntimeExecutionMode.STREAMING);
+
+        // Verify parallelism
+        Assertions.assertThat(env.getParallelism()).isEqualTo(8);
+        Assertions.assertThat(env.getMaxParallelism()).isEqualTo(128);
+
+        // Verify checkpointing
+        
Assertions.assertThat(checkpointConfig.getCheckpointInterval()).isEqualTo(10000L);
+        
Assertions.assertThat(checkpointConfig.getCheckpointingMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+        
Assertions.assertThat(checkpointConfig.getCheckpointTimeout()).isEqualTo(60000L);
+        
Assertions.assertThat(checkpointConfig.getMinPauseBetweenCheckpoints()).isEqualTo(5000L);
+    }
+
+    @Test
+    public void testInvalidExecutionModeHandling() {
+        // Should not throw exception, just log warning
+        template.sendBodyAndHeader(
+                "direct:invalidMode",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // Should execute without error despite invalid mode
+                        Assertions.assertThat(ds).isNotNull();
+                    }
+                });
+    }
+
+    @Test
+    public void testInvalidCheckpointingModeHandling() {
+        // Should not throw exception, just log warning
+        template.sendBodyAndHeader(
+                "direct:invalidCheckpointMode",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // Should execute without error despite invalid 
checkpoint mode
+                        Assertions.assertThat(ds).isNotNull();
+                    }
+                });
+    }
+
+    @Test
+    public void testConfigurationViaRouteParameters() {
+        AtomicReference<StreamExecutionEnvironment> envRef = new 
AtomicReference<>();
+
+        template.sendBodyAndHeader(
+                "direct:routeConfig",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        envRef.set(env);
+                    }
+                });
+
+        StreamExecutionEnvironment env = envRef.get();
+        Assertions.assertThat(env).isNotNull();
+
+        // Verify the configuration was applied via route parameters
+        Assertions.assertThat(env.getParallelism()).isEqualTo(2);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:batchMode")
+                        .to("flink:datastream?dataStream=#batchDataStream"
+                            + "&executionMode=BATCH");
+
+                from("direct:streamingMode")
+                        .to("flink:datastream?dataStream=#streamingDataStream"
+                            + "&executionMode=STREAMING");
+
+                from("direct:checkpointing")
+                        .to("flink:datastream?dataStream=#checkpointDataStream"
+                            + "&checkpointInterval=5000"
+                            + "&checkpointingMode=EXACTLY_ONCE"
+                            + "&checkpointTimeout=30000"
+                            + "&minPauseBetweenCheckpoints=2000");
+
+                from("direct:parallelism")
+                        
.to("flink:datastream?dataStream=#parallelismDataStream"
+                            + "&parallelism=4"
+                            + "&maxParallelism=64");
+
+                from("direct:fullConfig")
+                        .to("flink:datastream?dataStream=#fullConfigDataStream"
+                            + "&executionMode=STREAMING"
+                            + "&checkpointInterval=10000"
+                            + "&checkpointingMode=AT_LEAST_ONCE"
+                            + "&checkpointTimeout=60000"
+                            + "&minPauseBetweenCheckpoints=5000"
+                            + "&parallelism=8"
+                            + "&maxParallelism=128"
+                            + "&jobName=FullConfigTest");
+
+                from("direct:invalidMode")
+                        .to("flink:datastream?dataStream=#batchDataStream"
+                            + "&executionMode=INVALID_MODE"
+                            + "&dataStreamCallback=#captureEnvCallback");
+
+                from("direct:invalidCheckpointMode")
+                        .to("flink:datastream?dataStream=#checkpointDataStream"
+                            + "&checkpointInterval=5000"
+                            + "&checkpointingMode=INVALID_CHECKPOINT_MODE"
+                            + "&dataStreamCallback=#captureEnvCallback");
+
+                from("direct:routeConfig")
+                        
.to("flink:datastream?dataStream=#parallelismDataStream"
+                            + "&parallelism=2");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamEdgeCasesIT.java
 
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamEdgeCasesIT.java
new file mode 100644
index 000000000000..9bcef953c825
--- /dev/null
+++ 
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamEdgeCasesIT.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.flink;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for edge cases and error handling in DataStream 
configuration.
+ */
+public class DataStreamEdgeCasesIT extends CamelTestSupport {
+
+    // Create separate environment for isolation
+    StreamExecutionEnvironment testEnv = 
Flinks.createStreamExecutionEnvironment();
+
+    @BindToRegistry("testDataStream")
+    private DataStreamSource<String> testDs = testEnv.fromElements("test1", 
"test2");
+
+    @Test
+    public void testMissingDataStreamThrowsException() {
+        // Should throw exception when no DataStream is defined
+        Assertions.assertThatThrownBy(() -> {
+            template.sendBodyAndHeader(
+                    "direct:noDataStream",
+                    null,
+                    FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                    new VoidDataStreamCallback() {
+                        @Override
+                        public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                            ds.print();
+                        }
+                    });
+        }).isInstanceOf(CamelExecutionException.class)
+                .cause()
+                .hasMessageContaining("No DataStream defined");
+    }
+
+    @Test
+    public void testMissingCallbackThrowsException() {
+        // Should throw exception when no callback is defined
+        Assertions.assertThatThrownBy(() -> {
+            template.sendBody("direct:noCallback", null);
+        }).isInstanceOf(CamelExecutionException.class)
+                .cause()
+                .hasMessageContaining("Cannot resolve DataStream callback");
+    }
+
+    @Test
+    public void testInvalidParallelismHandling() {
+        // Flink will reject parallelism of 0 or negative values during 
configuration
+        // This test verifies that the error is clear
+        Assertions.assertThatThrownBy(() -> {
+            StreamExecutionEnvironment invalidEnv = 
Flinks.createStreamExecutionEnvironment();
+            invalidEnv.setParallelism(0);
+        }).isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Parallelism must be at least one");
+    }
+
+    @Test
+    public void testCheckpointWithoutIntervalIgnored() {
+        template.sendBodyAndHeader(
+                "direct:checkpointNoInterval",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        // Checkpointing mode is set but interval is not, so 
checkpointing won't be enabled
+                        Assertions.assertThat(env).isNotNull();
+                    }
+                });
+    }
+
+    @Test
+    public void testNullPayloadsHandling() {
+        template.sendBodyAndHeader(
+                "direct:withDataStream",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // Null body results in single null payload
+                        Assertions.assertThat(payloads).hasSize(1);
+                        Assertions.assertThat(payloads[0]).isNull();
+                    }
+                });
+    }
+
+    @Test
+    public void testEmptyListPayload() {
+        template.sendBodyAndHeader(
+                "direct:withDataStream",
+                java.util.Collections.emptyList(),
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // Empty list should result in empty array
+                        Assertions.assertThat(payloads).isEmpty();
+                    }
+                });
+    }
+
+    @Test
+    public void testVeryHighParallelism() {
+        template.sendBodyAndHeader(
+                "direct:veryHighParallelism",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        // Very high parallelism should be accepted (Flink 
will validate)
+                        
Assertions.assertThat(env.getParallelism()).isEqualTo(1000);
+                    }
+                });
+    }
+
+    @Test
+    public void testAutomaticExecutionMode() {
+        template.sendBodyAndHeader(
+                "direct:automaticMode",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // AUTOMATIC mode should be accepted
+                        Assertions.assertThat(ds).isNotNull();
+                    }
+                });
+    }
+
+    @Test
+    public void testVeryShortCheckpointInterval() {
+        template.sendBodyAndHeader(
+                "direct:shortCheckpoint",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        StreamExecutionEnvironment env = 
ds.getExecutionEnvironment();
+                        // Very short interval (100ms) should be configured, 
though not recommended
+                        
Assertions.assertThat(env.getCheckpointConfig().getCheckpointInterval()).isEqualTo(100L);
+                    }
+                });
+    }
+
+    @Test
+    public void testCallbackExceptionPropagation() {
+        Assertions.assertThatThrownBy(() -> {
+            template.sendBodyAndHeader(
+                    "direct:withDataStream",
+                    null,
+                    FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                    new VoidDataStreamCallback() {
+                        @Override
+                        public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                            throw new RuntimeException("Test exception from 
callback");
+                        }
+                    });
+        }).isInstanceOf(CamelExecutionException.class)
+                .cause()
+                .hasMessageContaining("Test exception from callback");
+    }
+
+    @Test
+    public void testHeaderOverridesEndpointCallback() {
+        // When both endpoint callback and header callback are present, header 
should win
+        boolean[] headerCallbackCalled = { false };
+
+        template.sendBodyAndHeader(
+                "direct:withEndpointCallback",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        headerCallbackCalled[0] = true;
+                    }
+                });
+
+        Assertions.assertThat(headerCallbackCalled[0]).isTrue();
+    }
+
+    @Test
+    public void testConfigurationWithAllNullOptionals() {
+        // All optional configuration parameters are null - should use defaults
+        template.sendBodyAndHeader(
+                "direct:minimalConfig",
+                null,
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // Should work with defaults
+                        Assertions.assertThat(ds).isNotNull();
+                    }
+                });
+    }
+
+    @BindToRegistry("endpointCallback")
+    public DataStreamCallback endpointCallback() {
+        return new VoidDataStreamCallback() {
+            @Override
+            public void doOnDataStream(DataStream ds, Object... payloads) 
throws Exception {
+                throw new AssertionError("Endpoint callback should not be 
called when header is present");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:noDataStream")
+                        .to("flink:datastream");
+
+                from("direct:noCallback")
+                        .to("flink:datastream?dataStream=#testDataStream");
+
+                from("direct:checkpointNoInterval")
+                        .to("flink:datastream?dataStream=#testDataStream"
+                            + "&checkpointingMode=EXACTLY_ONCE");
+
+                from("direct:withDataStream")
+                        .to("flink:datastream?dataStream=#testDataStream");
+
+                from("direct:veryHighParallelism")
+                        .to("flink:datastream?dataStream=#testDataStream"
+                            + "&parallelism=1000"
+                            + "&maxParallelism=2000");
+
+                from("direct:automaticMode")
+                        .to("flink:datastream?dataStream=#testDataStream"
+                            + "&executionMode=AUTOMATIC");
+
+                from("direct:shortCheckpoint")
+                        .to("flink:datastream?dataStream=#testDataStream"
+                            + "&checkpointInterval=100");
+
+                from("direct:withEndpointCallback")
+                        .to("flink:datastream?dataStream=#testDataStream"
+                            + "&dataStreamCallback=#endpointCallback");
+
+                from("direct:minimalConfig")
+                        .to("flink:datastream?dataStream=#testDataStream");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamProducerTest.java
 
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamProducerTest.java
new file mode 100644
index 000000000000..f47230c46bc9
--- /dev/null
+++ 
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamProducerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.flink;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class DataStreamProducerTest extends CamelTestSupport {
+
+    static StreamExecutionEnvironment streamExecutionEnvironment = 
Flinks.createStreamExecutionEnvironment();
+
+    String flinkDataStreamUri = "flink:dataStream?dataStream=#myDataStream";
+
+    @BindToRegistry("myDataStream")
+    private DataStreamSource<String> dss = 
streamExecutionEnvironment.readTextFile("src/test/resources/testds.txt");
+
+    @Test
+    public void shouldExecuteDataStreamCallback() {
+        template.sendBodyAndHeader(flinkDataStreamUri, null, 
FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        // Just verify the callback is executed
+                        ds.print();
+                    }
+                });
+    }
+
+    @Test
+    public void shouldExecuteDataStreamCallbackWithPayload() {
+        template.sendBodyAndHeader(flinkDataStreamUri, "test-payload",
+                FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        Assertions.assertThat(payloads).hasSize(1);
+                        
Assertions.assertThat(payloads[0]).isEqualTo("test-payload");
+                    }
+                });
+    }
+
+    @Test
+    public void shouldExecuteDataStreamCallbackWithMultiplePayloads() {
+        List<String> payloads = Arrays.asList("payload1", "payload2", 
"payload3");
+        template.sendBodyAndHeader(flinkDataStreamUri, payloads, 
FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+                new VoidDataStreamCallback() {
+                    @Override
+                    public void doOnDataStream(DataStream ds, Object... 
payloads) throws Exception {
+                        Assertions.assertThat(payloads).hasSize(3);
+                        
Assertions.assertThat(payloads[0]).isEqualTo("payload1");
+                        
Assertions.assertThat(payloads[1]).isEqualTo("payload2");
+                        
Assertions.assertThat(payloads[2]).isEqualTo("payload3");
+                    }
+                });
+    }
+
+    @Test
+    public void shouldConfigureExecutionMode() {
+        StreamExecutionEnvironment env = streamExecutionEnvironment;
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+        Assertions.assertThat(env.getConfiguration().get(
+                org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE))
+                .isEqualTo(RuntimeExecutionMode.BATCH);
+    }
+
+    @Test
+    public void shouldConfigureCheckpointing() {
+        StreamExecutionEnvironment env = 
Flinks.createStreamExecutionEnvironment();
+        env.enableCheckpointing(5000);
+        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
+        
Assertions.assertThat(env.getCheckpointConfig().getCheckpointInterval()).isEqualTo(5000);
+        Assertions.assertThat(env.getCheckpointConfig().getCheckpointingMode())
+                .isEqualTo(CheckpointingMode.EXACTLY_ONCE);
+    }
+
+    @Test
+    public void shouldConfigureParallelism() {
+        StreamExecutionEnvironment env = 
Flinks.createStreamExecutionEnvironment();
+        env.setParallelism(4);
+
+        Assertions.assertThat(env.getParallelism()).isEqualTo(4);
+    }
+
+    @Test
+    public void shouldConfigureMaxParallelism() {
+        StreamExecutionEnvironment env = 
Flinks.createStreamExecutionEnvironment();
+        env.setMaxParallelism(128);
+
+        Assertions.assertThat(env.getMaxParallelism()).isEqualTo(128);
+    }
+}
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc
index 20047a339852..59049c6546d6 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc
@@ -94,4 +94,151 @@ item.setDataValueListener(dataValue -> 
processValue(dataValue));
 item.setDataValueListener((monitoredItem, dataValue) -> 
processValue(dataValue));
 ----
 
-NOTE: Most users will not be affected by these changes as they primarily 
affect advanced use cases where you directly interact with the Milo API. 
Standard camel-milo endpoint configurations remain unchanged.
\ No newline at end of file
+=== camel-flink
+
+Apache Flink deprecated the DataSet API in version 1.12 in favor of a unified 
DataStream API that handles both
+streaming and batch processing. The DataStream API with bounded streams 
provides all the functionality of the
+DataSet API and more, with better performance and a unified programming model.
+
+==== Key Differences
+
+[cols="1,1,1", options="header"]
+|===
+|Aspect |DataSet API |DataStream API (Batch Mode)
+
+|Execution
+|Immediate (lazy evaluation)
+|Event-driven (requires explicit execution)
+
+|Data Type
+|Bounded datasets
+|Bounded or unbounded streams
+
+|Time Semantics
+|Not applicable
+|Event time, processing time, ingestion time
+
+|State Management
+|Limited
+|Full support for keyed and operator state
+
+|Windowing
+|Not applicable
+|Full windowing support
+|===
+
+==== Migration Guide
+
+===== Update Endpoint Type
+
+Replace `flink:dataset` with `flink:datastream`:
+
+.Before
+[source,java]
+-----------------------------------
+from("direct:start")
+    .to("flink:dataset?dataSet=#myDataSet&dataSetCallback=#myCallback");
+-----------------------------------
+
+.After
+[source,java]
+-----------------------------------
+from("direct:start")
+    
.to("flink:datastream?dataStream=#myDataStream&dataStreamCallback=#myCallback");
+-----------------------------------
+
+===== Configure Batch Execution Mode
+
+For batch processing with DataStream API, configure the execution environment 
for batch mode:
+
+[source,java]
+-----------------------------------
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+// Set to batch mode for bounded streams
+env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+-----------------------------------
+
+===== Update Data Sources
+
+.Before (DataSet API)
+[source,java]
+-----------------------------------
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+DataSet<String> dataSet = env.readTextFile("input.txt");
+-----------------------------------
+
+.After (DataStream API)
+[source,java]
+-----------------------------------
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+DataStream<String> dataStream = env.readTextFile("input.txt");
+-----------------------------------
+
+===== Update Transformations
+
+Most transformation operations have direct equivalents:
+
+[cols="1,1", options="header"]
+|===
+|DataSet API |DataStream API
+
+|`map()`
+|`map()`
+
+|`flatMap()`
+|`flatMap()`
+
+|`filter()`
+|`filter()`
+
+|`reduce()`
+|`reduce()` or `keyBy().reduce()`
+
+|`groupBy()`
+|`keyBy()`
+
+|`join()`
+|`join()` (with windowing)
+
+|`coGroup()`
+|`coGroup()` (with windowing)
+|===
+
+===== Update Callbacks
+
+Replace `DataSetCallback` with `DataStreamCallback`:
+
+.Before (DataSet)
+[source,java]
+-----------------------------------
+@Bean
+public DataSetCallback<Long> dataSetCallback() {
+    return new DataSetCallback<Long>() {
+        public Long onDataSet(DataSet dataSet, Object... objects) {
+            try {
+                return dataSet.count();
+            } catch (Exception e) {
+                return -1L;
+            }
+        }
+    };
+}
+-----------------------------------
+
+.After (DataStream)
+[source,java]
+-----------------------------------
+@Bean
+public DataStreamCallback dataStreamCallback() {
+    return new DataStreamCallback() {
+        public Object onDataStream(DataStream dataStream, Object... objects) {
+            // For batch mode, ensure runtime mode is set
+            dataStream.print();
+            return null;
+        }
+    };
+}
+-----------------------------------
+
+NOTE: Most users will not be affected by these changes as they primarily 
affect advanced use cases where you directly interact with the Milo API. 
Standard camel-milo endpoint configurations remain unchanged.
diff --git 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/FlinkComponentBuilderFactory.java
 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/FlinkComponentBuilderFactory.java
index 897e3dc0bb2d..7ec9ea2c6c5f 100644
--- 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/FlinkComponentBuilderFactory.java
+++ 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/FlinkComponentBuilderFactory.java
@@ -61,6 +61,7 @@ public interface FlinkComponentBuilderFactory {
          * @param dataSetCallback the value to set
          * @return the dsl builder
          */
+        @Deprecated
         default FlinkComponentBuilder 
dataSetCallback(org.apache.camel.component.flink.DataSetCallback 
dataSetCallback) {
             doSetProperty("dataSetCallback", dataSetCallback);
             return this;
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FlinkEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FlinkEndpointBuilderFactory.java
index 11a4d5f42bee..270771d8568c 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FlinkEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FlinkEndpointBuilderFactory.java
@@ -209,6 +209,113 @@ public interface FlinkEndpointBuilderFactory {
             return (FlinkEndpointBuilder) this;
         }
 
+        /**
+         * Checkpointing mode: EXACTLY_ONCE (default) or AT_LEAST_ONCE.
+         * EXACTLY_ONCE provides stronger guarantees but may have higher
+         * overhead.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param checkpointingMode the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder checkpointingMode(String 
checkpointingMode) {
+            doSetProperty("checkpointingMode", checkpointingMode);
+            return this;
+        }
+        /**
+         * Interval in milliseconds between checkpoints. Enables checkpointing
+         * when set. Recommended for streaming jobs to ensure fault tolerance.
+         * 
+         * The option is a: <code>java.lang.Long</code> type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param checkpointInterval the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder checkpointInterval(Long 
checkpointInterval) {
+            doSetProperty("checkpointInterval", checkpointInterval);
+            return this;
+        }
+        /**
+         * Interval in milliseconds between checkpoints. Enables checkpointing
+         * when set. Recommended for streaming jobs to ensure fault tolerance.
+         * 
+         * The option will be converted to a <code>java.lang.Long</code> type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param checkpointInterval the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder checkpointInterval(String 
checkpointInterval) {
+            doSetProperty("checkpointInterval", checkpointInterval);
+            return this;
+        }
+        /**
+         * Timeout in milliseconds for checkpoints. Checkpoints that take 
longer
+         * will be aborted.
+         * 
+         * The option is a: <code>java.lang.Long</code> type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param checkpointTimeout the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder checkpointTimeout(Long 
checkpointTimeout) {
+            doSetProperty("checkpointTimeout", checkpointTimeout);
+            return this;
+        }
+        /**
+         * Timeout in milliseconds for checkpoints. Checkpoints that take 
longer
+         * will be aborted.
+         * 
+         * The option will be converted to a <code>java.lang.Long</code> type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param checkpointTimeout the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder checkpointTimeout(String 
checkpointTimeout) {
+            doSetProperty("checkpointTimeout", checkpointTimeout);
+            return this;
+        }
+        /**
+         * Execution mode for the Flink job. Options: STREAMING (default),
+         * BATCH, AUTOMATIC. BATCH mode is recommended for bounded streams
+         * (batch processing).
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param executionMode the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder executionMode(String 
executionMode) {
+            doSetProperty("executionMode", executionMode);
+            return this;
+        }
+        /**
+         * Name for the Flink job. Useful for identification in the Flink UI 
and
+         * logs.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param jobName the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder jobName(String jobName) {
+            doSetProperty("jobName", jobName);
+            return this;
+        }
         /**
          * Whether the producer should be started lazy (on the first message).
          * By starting lazy you can use this to allow CamelContext and routes 
to
@@ -255,6 +362,98 @@ public interface FlinkEndpointBuilderFactory {
             doSetProperty("lazyStartProducer", lazyStartProducer);
             return this;
         }
+        /**
+         * Maximum parallelism for the Flink job. Defines the upper bound for
+         * dynamic scaling and the number of key groups for stateful operators.
+         * 
+         * The option is a: <code>java.lang.Integer</code> type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param maxParallelism the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder maxParallelism(Integer 
maxParallelism) {
+            doSetProperty("maxParallelism", maxParallelism);
+            return this;
+        }
+        /**
+         * Maximum parallelism for the Flink job. Defines the upper bound for
+         * dynamic scaling and the number of key groups for stateful operators.
+         * 
+         * The option will be converted to a <code>java.lang.Integer</code>
+         * type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param maxParallelism the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder maxParallelism(String 
maxParallelism) {
+            doSetProperty("maxParallelism", maxParallelism);
+            return this;
+        }
+        /**
+         * Minimum pause in milliseconds between consecutive checkpoints. Helps
+         * prevent checkpoint storms under heavy load.
+         * 
+         * The option is a: <code>java.lang.Long</code> type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param minPauseBetweenCheckpoints the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder minPauseBetweenCheckpoints(Long 
minPauseBetweenCheckpoints) {
+            doSetProperty("minPauseBetweenCheckpoints", 
minPauseBetweenCheckpoints);
+            return this;
+        }
+        /**
+         * Minimum pause in milliseconds between consecutive checkpoints. Helps
+         * prevent checkpoint storms under heavy load.
+         * 
+         * The option will be converted to a <code>java.lang.Long</code> type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param minPauseBetweenCheckpoints the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder minPauseBetweenCheckpoints(String 
minPauseBetweenCheckpoints) {
+            doSetProperty("minPauseBetweenCheckpoints", 
minPauseBetweenCheckpoints);
+            return this;
+        }
+        /**
+         * Parallelism for the Flink job. If not set, uses the default
+         * parallelism of the execution environment.
+         * 
+         * The option is a: <code>java.lang.Integer</code> type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param parallelism the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder parallelism(Integer parallelism) {
+            doSetProperty("parallelism", parallelism);
+            return this;
+        }
+        /**
+         * Parallelism for the Flink job. If not set, uses the default
+         * parallelism of the execution environment.
+         * 
+         * The option will be converted to a <code>java.lang.Integer</code>
+         * type.
+         * 
+         * Group: producer (advanced)
+         * 
+         * @param parallelism the value to set
+         * @return the dsl builder
+         */
+        default AdvancedFlinkEndpointBuilder parallelism(String parallelism) {
+            doSetProperty("parallelism", parallelism);
+            return this;
+        }
     }
 
     public interface FlinkBuilders {

Reply via email to