This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new a61f3980eb4b CAMEL-22567 - Camel-Flink: Deprecate DataSet API in favor
of DataStream API (#19605)
a61f3980eb4b is described below
commit a61f3980eb4b7c1cd3d00608505ebba5ef40ae96
Author: Andrea Cosentino <[email protected]>
AuthorDate: Fri Oct 17 13:56:43 2025 +0200
CAMEL-22567 - Camel-Flink: Deprecate DataSet API in favor of DataStream API
(#19605)
* CAMEL-22567 - Camel-Flink: Deprecate DataSet API in favor of DataStream
API
Signed-off-by: Andrea Cosentino <[email protected]>
* CAMEL-22567 - Camel-Flink: Deprecate DataSet API in favor of DataStream
API
Signed-off-by: Andrea Cosentino <[email protected]>
---------
Signed-off-by: Andrea Cosentino <[email protected]>
---
.../org/apache/camel/catalog/components/flink.json | 12 +-
.../component/flink/FlinkEndpointConfigurer.java | 45 +++
.../component/flink/FlinkEndpointUriFactory.java | 10 +-
.../org/apache/camel/component/flink/flink.json | 12 +-
.../camel-flink/src/main/docs/flink-component.adoc | 210 +++++++++++++
.../component/flink/ConvertingDataSetCallback.java | 9 +
.../camel/component/flink/DataSetCallback.java | 6 +-
.../component/flink/DataSetFlinkProducer.java | 7 +
.../component/flink/DataStreamFlinkProducer.java | 97 ++++++
.../camel/component/flink/FlinkComponent.java | 6 +
.../camel/component/flink/FlinkEndpoint.java | 114 +++++++
.../camel/component/flink/VoidDataSetCallback.java | 8 +
.../annotations/AnnotatedDataSetCallback.java | 5 +
.../flink/annotations/DataSetCallback.java | 8 +
.../flink/DataStreamBatchProcessingIT.java | 292 ++++++++++++++++++
.../component/flink/DataStreamConfigurationIT.java | 329 +++++++++++++++++++++
.../component/flink/DataStreamEdgeCasesIT.java | 277 +++++++++++++++++
.../component/flink/DataStreamProducerTest.java | 117 ++++++++
.../ROOT/pages/camel-4x-upgrade-guide-4_16.adoc | 149 +++++++++-
.../dsl/FlinkComponentBuilderFactory.java | 1 +
.../endpoint/dsl/FlinkEndpointBuilderFactory.java | 199 +++++++++++++
21 files changed, 1906 insertions(+), 7 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/flink.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/flink.json
index 85f0a4f96399..a19a18dabbb5 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/flink.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/flink.json
@@ -24,7 +24,7 @@
"remote": true
},
"componentProperties": {
- "dataSetCallback": { "index": 0, "kind": "property", "displayName": "Data
Set Callback", "group": "producer", "label": "", "required": false, "type":
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback",
"deprecated": false, "autowired": false, "secret": false, "description":
"Function performing action against a DataSet." },
+ "dataSetCallback": { "index": 0, "kind": "property", "displayName": "Data
Set Callback", "group": "producer", "label": "", "required": false, "type":
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback",
"deprecated": true, "autowired": false, "secret": false, "description":
"Function performing action against a DataSet." },
"dataStream": { "index": 1, "kind": "property", "displayName": "Data
Stream", "group": "producer", "label": "", "required": false, "type": "object",
"javaType": "org.apache.flink.streaming.api.datastream.DataStream",
"deprecated": false, "autowired": false, "secret": false, "description":
"DataStream to compute against." },
"dataStreamCallback": { "index": 2, "kind": "property", "displayName":
"Data Stream Callback", "group": "producer", "label": "", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.flink.DataStreamCallback", "deprecated": false,
"autowired": false, "secret": false, "description": "Function performing action
against a DataStream." },
"lazyStartProducer": { "index": 3, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fail [...]
@@ -43,6 +43,14 @@
"dataSetCallback": { "index": 3, "kind": "parameter", "displayName": "Data
Set Callback", "group": "producer", "label": "", "required": false, "type":
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback",
"deprecated": false, "autowired": false, "secret": false, "description":
"Function performing action against a DataSet." },
"dataStream": { "index": 4, "kind": "parameter", "displayName": "Data
Stream", "group": "producer", "label": "", "required": false, "type": "object",
"javaType": "org.apache.flink.streaming.api.datastream.DataStream",
"deprecated": false, "autowired": false, "secret": false, "description":
"DataStream to compute against." },
"dataStreamCallback": { "index": 5, "kind": "parameter", "displayName":
"Data Stream Callback", "group": "producer", "label": "", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.flink.DataStreamCallback", "deprecated": false,
"autowired": false, "secret": false, "description": "Function performing action
against a DataStream." },
- "lazyStartProducer": { "index": 6, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produc [...]
+ "checkpointingMode": { "index": 6, "kind": "parameter", "displayName":
"Checkpointing Mode", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "enum", "javaType":
"java.lang.String", "enum": [ "EXACTLY_ONCE", "AT_LEAST_ONCE" ], "deprecated":
false, "autowired": false, "secret": false, "description": "Checkpointing mode:
EXACTLY_ONCE (default) or AT_LEAST_ONCE. EXACTLY_ONCE provides stronger
guarantees but may have higher overhead." },
+ "checkpointInterval": { "index": 7, "kind": "parameter", "displayName":
"Checkpoint Interval", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "integer", "javaType":
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false,
"description": "Interval in milliseconds between checkpoints. Enables
checkpointing when set. Recommended for streaming jobs to ensure fault
tolerance." },
+ "checkpointTimeout": { "index": 8, "kind": "parameter", "displayName":
"Checkpoint Timeout", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "integer", "javaType":
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false,
"description": "Timeout in milliseconds for checkpoints. Checkpoints that take
longer will be aborted." },
+ "executionMode": { "index": 9, "kind": "parameter", "displayName":
"Execution Mode", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "enum", "javaType": "java.lang.String", "enum": [
"STREAMING", "BATCH", "AUTOMATIC" ], "deprecated": false, "autowired": false,
"secret": false, "description": "Execution mode for the Flink job. Options:
STREAMING (default), BATCH, AUTOMATIC. BATCH mode is recommended for bounded
streams (batch processing)." },
+ "jobName": { "index": 10, "kind": "parameter", "displayName": "Job Name",
"group": "producer (advanced)", "label": "producer,advanced", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "Name for the Flink job.
Useful for identification in the Flink UI and logs." },
+ "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "maxParallelism": { "index": 12, "kind": "parameter", "displayName": "Max
Parallelism", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "integer", "javaType": "java.lang.Integer",
"deprecated": false, "autowired": false, "secret": false, "description":
"Maximum parallelism for the Flink job. Defines the upper bound for dynamic
scaling and the number of key groups for stateful operators." },
+ "minPauseBetweenCheckpoints": { "index": 13, "kind": "parameter",
"displayName": "Min Pause Between Checkpoints", "group": "producer (advanced)",
"label": "producer,advanced", "required": false, "type": "integer", "javaType":
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false,
"description": "Minimum pause in milliseconds between consecutive checkpoints.
Helps prevent checkpoint storms under heavy load." },
+ "parallelism": { "index": 14, "kind": "parameter", "displayName":
"Parallelism", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "integer", "javaType": "java.lang.Integer",
"deprecated": false, "autowired": false, "secret": false, "description":
"Parallelism for the Flink job. If not set, uses the default parallelism of the
execution environment." }
}
}
diff --git
a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java
b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java
index 79b1a2aa6f47..07206669e81b 100644
---
a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java
+++
b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java
@@ -23,6 +23,12 @@ public class FlinkEndpointConfigurer extends
PropertyConfigurerSupport implement
public boolean configure(CamelContext camelContext, Object obj, String
name, Object value, boolean ignoreCase) {
FlinkEndpoint target = (FlinkEndpoint) obj;
switch (ignoreCase ? name.toLowerCase() : name) {
+ case "checkpointinterval":
+ case "checkpointInterval":
target.setCheckpointInterval(property(camelContext, java.lang.Long.class,
value)); return true;
+ case "checkpointtimeout":
+ case "checkpointTimeout":
target.setCheckpointTimeout(property(camelContext, java.lang.Long.class,
value)); return true;
+ case "checkpointingmode":
+ case "checkpointingMode":
target.setCheckpointingMode(property(camelContext, java.lang.String.class,
value)); return true;
case "collect": target.setCollect(property(camelContext,
boolean.class, value)); return true;
case "dataset":
case "dataSet": target.setDataSet(property(camelContext,
org.apache.flink.api.java.DataSet.class, value)); return true;
@@ -32,8 +38,17 @@ public class FlinkEndpointConfigurer extends
PropertyConfigurerSupport implement
case "dataStream": target.setDataStream(property(camelContext,
org.apache.flink.streaming.api.datastream.DataStream.class, value)); return
true;
case "datastreamcallback":
case "dataStreamCallback":
target.setDataStreamCallback(property(camelContext,
org.apache.camel.component.flink.DataStreamCallback.class, value)); return true;
+ case "executionmode":
+ case "executionMode": target.setExecutionMode(property(camelContext,
java.lang.String.class, value)); return true;
+ case "jobname":
+ case "jobName": target.setJobName(property(camelContext,
java.lang.String.class, value)); return true;
case "lazystartproducer":
case "lazyStartProducer":
target.setLazyStartProducer(property(camelContext, boolean.class, value));
return true;
+ case "maxparallelism":
+ case "maxParallelism": target.setMaxParallelism(property(camelContext,
java.lang.Integer.class, value)); return true;
+ case "minpausebetweencheckpoints":
+ case "minPauseBetweenCheckpoints":
target.setMinPauseBetweenCheckpoints(property(camelContext,
java.lang.Long.class, value)); return true;
+ case "parallelism": target.setParallelism(property(camelContext,
java.lang.Integer.class, value)); return true;
default: return false;
}
}
@@ -41,6 +56,12 @@ public class FlinkEndpointConfigurer extends
PropertyConfigurerSupport implement
@Override
public Class<?> getOptionType(String name, boolean ignoreCase) {
switch (ignoreCase ? name.toLowerCase() : name) {
+ case "checkpointinterval":
+ case "checkpointInterval": return java.lang.Long.class;
+ case "checkpointtimeout":
+ case "checkpointTimeout": return java.lang.Long.class;
+ case "checkpointingmode":
+ case "checkpointingMode": return java.lang.String.class;
case "collect": return boolean.class;
case "dataset":
case "dataSet": return org.apache.flink.api.java.DataSet.class;
@@ -50,8 +71,17 @@ public class FlinkEndpointConfigurer extends
PropertyConfigurerSupport implement
case "dataStream": return
org.apache.flink.streaming.api.datastream.DataStream.class;
case "datastreamcallback":
case "dataStreamCallback": return
org.apache.camel.component.flink.DataStreamCallback.class;
+ case "executionmode":
+ case "executionMode": return java.lang.String.class;
+ case "jobname":
+ case "jobName": return java.lang.String.class;
case "lazystartproducer":
case "lazyStartProducer": return boolean.class;
+ case "maxparallelism":
+ case "maxParallelism": return java.lang.Integer.class;
+ case "minpausebetweencheckpoints":
+ case "minPauseBetweenCheckpoints": return java.lang.Long.class;
+ case "parallelism": return java.lang.Integer.class;
default: return null;
}
}
@@ -60,6 +90,12 @@ public class FlinkEndpointConfigurer extends
PropertyConfigurerSupport implement
public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
FlinkEndpoint target = (FlinkEndpoint) obj;
switch (ignoreCase ? name.toLowerCase() : name) {
+ case "checkpointinterval":
+ case "checkpointInterval": return target.getCheckpointInterval();
+ case "checkpointtimeout":
+ case "checkpointTimeout": return target.getCheckpointTimeout();
+ case "checkpointingmode":
+ case "checkpointingMode": return target.getCheckpointingMode();
case "collect": return target.isCollect();
case "dataset":
case "dataSet": return target.getDataSet();
@@ -69,8 +105,17 @@ public class FlinkEndpointConfigurer extends
PropertyConfigurerSupport implement
case "dataStream": return target.getDataStream();
case "datastreamcallback":
case "dataStreamCallback": return target.getDataStreamCallback();
+ case "executionmode":
+ case "executionMode": return target.getExecutionMode();
+ case "jobname":
+ case "jobName": return target.getJobName();
case "lazystartproducer":
case "lazyStartProducer": return target.isLazyStartProducer();
+ case "maxparallelism":
+ case "maxParallelism": return target.getMaxParallelism();
+ case "minpausebetweencheckpoints":
+ case "minPauseBetweenCheckpoints": return
target.getMinPauseBetweenCheckpoints();
+ case "parallelism": return target.getParallelism();
default: return null;
}
}
diff --git
a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java
b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java
index 1807f4f165a0..b3d97dff643e 100644
---
a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java
+++
b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java
@@ -23,14 +23,22 @@ public class FlinkEndpointUriFactory extends
org.apache.camel.support.component.
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Map<String, String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(7);
+ Set<String> props = new HashSet<>(15);
+ props.add("checkpointInterval");
+ props.add("checkpointTimeout");
+ props.add("checkpointingMode");
props.add("collect");
props.add("dataSet");
props.add("dataSetCallback");
props.add("dataStream");
props.add("dataStreamCallback");
props.add("endpointType");
+ props.add("executionMode");
+ props.add("jobName");
props.add("lazyStartProducer");
+ props.add("maxParallelism");
+ props.add("minPauseBetweenCheckpoints");
+ props.add("parallelism");
PROPERTY_NAMES = Collections.unmodifiableSet(props);
SECRET_PROPERTY_NAMES = Collections.emptySet();
MULTI_VALUE_PREFIXES = Collections.emptyMap();
diff --git
a/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json
b/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json
index 85f0a4f96399..a19a18dabbb5 100644
---
a/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json
+++
b/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json
@@ -24,7 +24,7 @@
"remote": true
},
"componentProperties": {
- "dataSetCallback": { "index": 0, "kind": "property", "displayName": "Data
Set Callback", "group": "producer", "label": "", "required": false, "type":
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback",
"deprecated": false, "autowired": false, "secret": false, "description":
"Function performing action against a DataSet." },
+ "dataSetCallback": { "index": 0, "kind": "property", "displayName": "Data
Set Callback", "group": "producer", "label": "", "required": false, "type":
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback",
"deprecated": true, "autowired": false, "secret": false, "description":
"Function performing action against a DataSet." },
"dataStream": { "index": 1, "kind": "property", "displayName": "Data
Stream", "group": "producer", "label": "", "required": false, "type": "object",
"javaType": "org.apache.flink.streaming.api.datastream.DataStream",
"deprecated": false, "autowired": false, "secret": false, "description":
"DataStream to compute against." },
"dataStreamCallback": { "index": 2, "kind": "property", "displayName":
"Data Stream Callback", "group": "producer", "label": "", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.flink.DataStreamCallback", "deprecated": false,
"autowired": false, "secret": false, "description": "Function performing action
against a DataStream." },
"lazyStartProducer": { "index": 3, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fail [...]
@@ -43,6 +43,14 @@
"dataSetCallback": { "index": 3, "kind": "parameter", "displayName": "Data
Set Callback", "group": "producer", "label": "", "required": false, "type":
"object", "javaType": "org.apache.camel.component.flink.DataSetCallback",
"deprecated": false, "autowired": false, "secret": false, "description":
"Function performing action against a DataSet." },
"dataStream": { "index": 4, "kind": "parameter", "displayName": "Data
Stream", "group": "producer", "label": "", "required": false, "type": "object",
"javaType": "org.apache.flink.streaming.api.datastream.DataStream",
"deprecated": false, "autowired": false, "secret": false, "description":
"DataStream to compute against." },
"dataStreamCallback": { "index": 5, "kind": "parameter", "displayName":
"Data Stream Callback", "group": "producer", "label": "", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.flink.DataStreamCallback", "deprecated": false,
"autowired": false, "secret": false, "description": "Function performing action
against a DataStream." },
- "lazyStartProducer": { "index": 6, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produc [...]
+ "checkpointingMode": { "index": 6, "kind": "parameter", "displayName":
"Checkpointing Mode", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "enum", "javaType":
"java.lang.String", "enum": [ "EXACTLY_ONCE", "AT_LEAST_ONCE" ], "deprecated":
false, "autowired": false, "secret": false, "description": "Checkpointing mode:
EXACTLY_ONCE (default) or AT_LEAST_ONCE. EXACTLY_ONCE provides stronger
guarantees but may have higher overhead." },
+ "checkpointInterval": { "index": 7, "kind": "parameter", "displayName":
"Checkpoint Interval", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "integer", "javaType":
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false,
"description": "Interval in milliseconds between checkpoints. Enables
checkpointing when set. Recommended for streaming jobs to ensure fault
tolerance." },
+ "checkpointTimeout": { "index": 8, "kind": "parameter", "displayName":
"Checkpoint Timeout", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "integer", "javaType":
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false,
"description": "Timeout in milliseconds for checkpoints. Checkpoints that take
longer will be aborted." },
+ "executionMode": { "index": 9, "kind": "parameter", "displayName":
"Execution Mode", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "enum", "javaType": "java.lang.String", "enum": [
"STREAMING", "BATCH", "AUTOMATIC" ], "deprecated": false, "autowired": false,
"secret": false, "description": "Execution mode for the Flink job. Options:
STREAMING (default), BATCH, AUTOMATIC. BATCH mode is recommended for bounded
streams (batch processing)." },
+ "jobName": { "index": 10, "kind": "parameter", "displayName": "Job Name",
"group": "producer (advanced)", "label": "producer,advanced", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "Name for the Flink job.
Useful for identification in the Flink UI and logs." },
+ "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "maxParallelism": { "index": 12, "kind": "parameter", "displayName": "Max
Parallelism", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "integer", "javaType": "java.lang.Integer",
"deprecated": false, "autowired": false, "secret": false, "description":
"Maximum parallelism for the Flink job. Defines the upper bound for dynamic
scaling and the number of key groups for stateful operators." },
+ "minPauseBetweenCheckpoints": { "index": 13, "kind": "parameter",
"displayName": "Min Pause Between Checkpoints", "group": "producer (advanced)",
"label": "producer,advanced", "required": false, "type": "integer", "javaType":
"java.lang.Long", "deprecated": false, "autowired": false, "secret": false,
"description": "Minimum pause in milliseconds between consecutive checkpoints.
Helps prevent checkpoint storms under heavy load." },
+ "parallelism": { "index": 14, "kind": "parameter", "displayName":
"Parallelism", "group": "producer (advanced)", "label": "producer,advanced",
"required": false, "type": "integer", "javaType": "java.lang.Integer",
"deprecated": false, "autowired": false, "secret": false, "description":
"Parallelism for the Flink job. If not set, uses the default parallelism of the
execution environment." }
}
}
diff --git a/components/camel-flink/src/main/docs/flink-component.adoc
b/components/camel-flink/src/main/docs/flink-component.adoc
index d4172f7e4133..8e86f995c957 100644
--- a/components/camel-flink/src/main/docs/flink-component.adoc
+++ b/components/camel-flink/src/main/docs/flink-component.adoc
@@ -44,6 +44,10 @@
flink:dataset?dataset=#myDataSet&dataSetCallback=#dataSetCallback
flink:datastream?datastream=#myDataStream&dataStreamCallback=#dataStreamCallback
-------------------------------------------------
+IMPORTANT: The DataSet API has been deprecated by Apache Flink since version
1.12.
+It is recommended to migrate to the DataStream API with bounded streams for
batch processing.
+See the Migration Guide section below for more details.
+
// component-configure options: START
@@ -116,5 +120,211 @@ try {
}
-----------------------------------
+=== Modern DataStream Batch Processing Example
+
+The recommended approach using the DataStream API in batch mode:
+
+[source,java]
+-----------------------------------
+@Bean
+public StreamExecutionEnvironment streamExecutionEnvironment() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ // Configure for batch processing
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ return env;
+}
+
+@Bean
+public DataStreamSource<String> myDataStream(StreamExecutionEnvironment env) {
+ return env.readTextFile("src/test/resources/testds.txt");
+}
+
+@Bean
+public DataStreamCallback wordCountCallback() {
+ return new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream dataStream, Object... payloads)
throws Exception {
+ dataStream
+ .flatMap((String line, Collector<Tuple2<String, Integer>> out)
-> {
+ for (String word : line.split("\\s+")) {
+ out.collect(Tuple2.of(word, 1));
+ }
+ })
+ .returns(Types.TUPLE(Types.STRING, Types.INT))
+ .keyBy(tuple -> tuple.f0)
+ .sum(1)
+ .print();
+ }
+ };
+}
+
+// In your route
+from("direct:wordCount")
+
.to("flink:datastream?dataStream=#myDataStream&dataStreamCallback=#wordCountCallback");
+-----------------------------------
+
+=== Real-time Streaming Example
+
+For true streaming use cases with unbounded data:
+
+[source,java]
+-----------------------------------
+@Bean
+public StreamExecutionEnvironment streamingEnvironment() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ // Configure for streaming (default mode)
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ // Enable checkpointing for fault tolerance
+ env.enableCheckpointing(10000); // checkpoint every 10 seconds
+ return env;
+}
+
+@Bean
+public DataStreamCallback streamingProcessCallback() {
+ return new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream dataStream, Object... payloads)
throws Exception {
+ dataStream
+ .map(event -> processEvent(event))
+ .keyBy(event -> event.getKey())
+ .window(TumblingEventTimeWindows.of(Time.minutes(5)))
+ .aggregate(new MyAggregateFunction())
+ .addSink(new MyCustomSink());
+
+ // Execute the streaming job
+ dataStream.getExecutionEnvironment().execute("Streaming Job");
+ }
+ };
+}
+-----------------------------------
+
+=== Advanced Configuration Examples
+
+==== Batch Processing with Configuration
+
+Configure a DataStream endpoint for batch processing with custom settings:
+
+[source,java]
+-----------------------------------
+from("direct:batchProcess")
+ .to("flink:datastream?dataStream=#myDataStream"
+ + "&dataStreamCallback=#myCallback"
+ + "&executionMode=BATCH"
+ + "¶llelism=4"
+ + "&jobName=MyBatchJob");
+-----------------------------------
+
+==== Streaming with Checkpointing
+
+Configure a streaming job with checkpointing for fault tolerance:
+
+[source,java]
+-----------------------------------
+from("direct:streamProcess")
+ .to("flink:datastream?dataStream=#myDataStream"
+ + "&dataStreamCallback=#streamCallback"
+ + "&executionMode=STREAMING"
+ + "&checkpointInterval=60000" // Checkpoint every 60
seconds
+ + "&checkpointingMode=EXACTLY_ONCE" // Exactly-once semantics
+ + "&checkpointTimeout=120000" // 2 minute timeout
+ + "&minPauseBetweenCheckpoints=30000" // 30 second minimum pause
+ + "¶llelism=8"
+ + "&maxParallelism=128"
+ + "&jobName=StreamingPipeline");
+-----------------------------------
+
+==== Configuration Options Reference
+
+[cols="1,1,1,3", options="header"]
+|===
+|Parameter |Type |Default |Description
+
+|executionMode
+|String
+|STREAMING
+|Runtime execution mode: STREAMING, BATCH, or AUTOMATIC. BATCH is recommended
for bounded streams.
+
+|checkpointInterval
+|Long
+|null
+|Checkpoint interval in milliseconds. Setting this enables checkpointing.
+
+|checkpointingMode
+|String
+|EXACTLY_ONCE
+|Checkpointing mode: EXACTLY_ONCE or AT_LEAST_ONCE.
+
+|checkpointTimeout
+|Long
+|10 minutes
+|Maximum time in milliseconds that a checkpoint may take.
+
+|minPauseBetweenCheckpoints
+|Long
+|0
+|Minimum time in milliseconds between consecutive checkpoints.
+
+|parallelism
+|Integer
+|Default parallelism
+|Parallelism for the Flink job operations.
+
+|maxParallelism
+|Integer
+|128
+|Maximum parallelism, which defines the upper bound for dynamic scaling.
+
+|jobName
+|String
+|null
+|Name for the Flink job, useful for identification in Flink UI.
+
+|collect
+|Boolean
+|true
+|Whether to collect results (not applicable for unbounded streams).
+|===
+
+
+=== Common Patterns
+
+==== Counting Elements
+
+.Before (DataSet)
+[source,java]
+-----------------------------------
+long count = dataSet.count();
+-----------------------------------
+
+.After (DataStream)
+[source,java]
+-----------------------------------
+// Use a custom sink or reduce operation
+dataStream
+ .map(e -> 1L)
+ .reduce(Long::sum)
+ .print();
+-----------------------------------
+
+==== Collecting Results
+
+.Before (DataSet)
+[source,java]
+-----------------------------------
+List<String> results = dataSet.collect();
+-----------------------------------
+
+.After (DataStream)
+[source,java]
+-----------------------------------
+// In batch mode, use executeAndCollect() or a sink
+List<String> results = dataStream.executeAndCollect(1000);
+-----------------------------------
+
+=== Additional Resources
+
+*
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/dataset/overview/[Apache
Flink DataSet API Migration Guide]
+*
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/overview/[DataStream
API Documentation]
+*
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/execution_mode/[Batch
Execution Mode]
include::spring-boot:partial$starter.adoc[]
diff --git
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java
index f91bd9a366c4..9ba05cb3b0cc 100644
---
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java
+++
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java
@@ -21,6 +21,15 @@ import org.apache.flink.api.java.DataSet;
import static java.lang.String.format;
+/**
+ * DataSet callback with automatic type conversion for payloads.
+ *
+ * @param <T> results type
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the
DataStream API with bounded streams instead.
+ * See the Flink migration guide for details on migrating from
DataSet to DataStream API. This class
+ * will be maintained for backward compatibility but may be
removed in future versions.
+ */
+@Deprecated(since = "4.16.0")
public abstract class ConvertingDataSetCallback<T> implements
DataSetCallback<T> {
private final CamelContext camelContext;
diff --git
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java
index 3979bfe9208b..93a253250c1d 100644
---
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java
+++
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java
@@ -21,8 +21,12 @@ import org.apache.flink.api.java.DataSet;
/**
* Generic block of code with parameters which can be executed against DataSet
and return results.
*
- * @param <T> results type
+ * @param <T> results type
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the
DataStream API with bounded streams instead.
+ * See the Flink migration guide for details on migrating from
DataSet to DataStream API. This class
+ * will be maintained for backward compatibility but may be
removed in future versions.
*/
+@Deprecated(since = "4.16.0")
public interface DataSetCallback<T> {
T onDataSet(DataSet ds, Object... payloads);
diff --git
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java
index 75cb4798d890..dc038b76a8dc 100644
---
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java
+++
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java
@@ -22,6 +22,13 @@ import org.apache.camel.Exchange;
import org.apache.camel.support.DefaultProducer;
import org.apache.flink.api.java.DataSet;
+/**
+ * Producer for executing Flink DataSet operations.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the
DataStream API with bounded streams instead. See
+ * the Flink migration guide for details on migrating from DataSet
to DataStream API.
+ */
+@Deprecated(since = "4.16.0")
public class DataSetFlinkProducer extends DefaultProducer {
public DataSetFlinkProducer(FlinkEndpoint endpoint) {
diff --git
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
index 53d8bf5bac7d..25a359694713 100644
---
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
+++
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
@@ -20,10 +20,23 @@ import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.support.DefaultProducer;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Producer for executing Flink DataStream operations with support for modern
Flink features including execution mode
+ * configuration, checkpointing, and parallelism settings.
+ */
public class DataStreamFlinkProducer extends DefaultProducer {
+ private static final Logger LOG =
LoggerFactory.getLogger(DataStreamFlinkProducer.class);
+
+ private volatile boolean environmentConfigured = false;
+
public DataStreamFlinkProducer(FlinkEndpoint endpoint) {
super(endpoint);
}
@@ -31,6 +44,17 @@ public class DataStreamFlinkProducer extends DefaultProducer
{
@Override
public void process(Exchange exchange) throws Exception {
DataStream ds = resolveDataStream(exchange);
+
+ // Configure environment on first use when DataStream is available
+ if (!environmentConfigured && ds != null) {
+ synchronized (this) {
+ if (!environmentConfigured) {
+ configureStreamExecutionEnvironment(ds);
+ environmentConfigured = true;
+ }
+ }
+ }
+
DataStreamCallback dataStreamCallback =
resolveDataStreamCallback(exchange);
Object body = exchange.getIn().getBody();
Object result = body instanceof List
@@ -76,4 +100,77 @@ public class DataStreamFlinkProducer extends
DefaultProducer {
throw new IllegalArgumentException("Cannot resolve DataStream
callback.");
}
}
+
+ /**
+ * Configures the StreamExecutionEnvironment with the settings from the
endpoint. This includes execution mode,
+ * checkpointing, parallelism, and other advanced options.
+ *
+ * @param dataStream the DataStream to configure the environment for
+ */
+ protected void configureStreamExecutionEnvironment(DataStream dataStream) {
+ if (dataStream == null) {
+ LOG.debug("No DataStream provided, skipping environment
configuration");
+ return;
+ }
+
+ StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
+
+ // Configure execution mode (BATCH, STREAMING, AUTOMATIC)
+ if (getEndpoint().getExecutionMode() != null) {
+ try {
+ RuntimeExecutionMode mode =
RuntimeExecutionMode.valueOf(getEndpoint().getExecutionMode());
+ env.setRuntimeMode(mode);
+ LOG.info("Set Flink runtime execution mode to: {}", mode);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Invalid execution mode '{}'. Valid values are:
STREAMING, BATCH, AUTOMATIC",
+ getEndpoint().getExecutionMode());
+ }
+ }
+
+ // Configure parallelism
+ if (getEndpoint().getParallelism() != null) {
+ env.setParallelism(getEndpoint().getParallelism());
+ LOG.info("Set Flink parallelism to: {}",
getEndpoint().getParallelism());
+ }
+
+ // Configure max parallelism
+ if (getEndpoint().getMaxParallelism() != null) {
+ env.setMaxParallelism(getEndpoint().getMaxParallelism());
+ LOG.info("Set Flink max parallelism to: {}",
getEndpoint().getMaxParallelism());
+ }
+
+ // Configure checkpointing
+ if (getEndpoint().getCheckpointInterval() != null &&
getEndpoint().getCheckpointInterval() > 0) {
+ env.enableCheckpointing(getEndpoint().getCheckpointInterval());
+ LOG.info("Enabled checkpointing with interval: {} ms",
getEndpoint().getCheckpointInterval());
+
+ // Configure checkpointing mode
+ if (getEndpoint().getCheckpointingMode() != null) {
+ try {
+ CheckpointingMode mode =
CheckpointingMode.valueOf(getEndpoint().getCheckpointingMode());
+ env.getCheckpointConfig().setCheckpointingMode(mode);
+ LOG.info("Set checkpointing mode to: {}", mode);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Invalid checkpointing mode '{}'. Valid values
are: EXACTLY_ONCE, AT_LEAST_ONCE",
+ getEndpoint().getCheckpointingMode());
+ }
+ }
+
+ // Configure checkpoint timeout
+ if (getEndpoint().getCheckpointTimeout() != null) {
+
env.getCheckpointConfig().setCheckpointTimeout(getEndpoint().getCheckpointTimeout());
+ LOG.info("Set checkpoint timeout to: {} ms",
getEndpoint().getCheckpointTimeout());
+ }
+
+ // Configure min pause between checkpoints
+ if (getEndpoint().getMinPauseBetweenCheckpoints() != null) {
+ env.getCheckpointConfig()
+
.setMinPauseBetweenCheckpoints(getEndpoint().getMinPauseBetweenCheckpoints());
+ LOG.info("Set min pause between checkpoints to: {} ms",
+ getEndpoint().getMinPauseBetweenCheckpoints());
+ }
+ }
+
+ LOG.debug("StreamExecutionEnvironment configuration completed");
+ }
}
diff --git
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
index f0f950153ad3..7c1ee50a48a2 100644
---
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
+++
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
@@ -58,7 +58,10 @@ public class FlinkComponent extends DefaultComponent {
/**
* DataSet to compute against.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the
DataStream API with bounded streams instead.
*/
+ @Deprecated(since = "4.16.0")
public void setDataSet(DataSet ds) {
this.ds = ds;
}
@@ -80,7 +83,10 @@ public class FlinkComponent extends DefaultComponent {
/**
* Function performing action against a DataSet.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the
DataStream API with bounded streams instead.
*/
+ @Deprecated(since = "4.16.0")
public void setDataSetCallback(DataSetCallback dataSetCallback) {
this.dataSetCallback = dataSetCallback;
}
diff --git
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
index 5d2eea15ae8a..8bffd74cf250 100644
---
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
+++
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
@@ -48,6 +48,22 @@ public class FlinkEndpoint extends DefaultEndpoint {
private DataStreamCallback dataStreamCallback;
@UriParam(defaultValue = "true")
private boolean collect = true;
+ @UriParam(label = "producer,advanced", enums = "STREAMING,BATCH,AUTOMATIC")
+ private String executionMode;
+ @UriParam(label = "producer,advanced")
+ private Long checkpointInterval;
+ @UriParam(label = "producer,advanced", enums =
"EXACTLY_ONCE,AT_LEAST_ONCE")
+ private String checkpointingMode;
+ @UriParam(label = "producer,advanced")
+ private Integer parallelism;
+ @UriParam(label = "producer,advanced")
+ private Integer maxParallelism;
+ @UriParam(label = "producer,advanced")
+ private String jobName;
+ @UriParam(label = "producer,advanced")
+ private Long checkpointTimeout;
+ @UriParam(label = "producer,advanced")
+ private Long minPauseBetweenCheckpoints;
public FlinkEndpoint(String endpointUri, FlinkComponent component,
EndpointType endpointType) {
super(endpointUri, component);
@@ -105,7 +121,10 @@ public class FlinkEndpoint extends DefaultEndpoint {
/**
* DataSet to compute against.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the
DataStream API with bounded streams instead.
*/
+ @Deprecated(since = "4.16.0")
public void setDataSet(DataSet ds) {
this.dataSet = ds;
}
@@ -127,7 +146,10 @@ public class FlinkEndpoint extends DefaultEndpoint {
/**
* Function performing action against a DataSet.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the
DataStream API with bounded streams instead.
*/
+ @Deprecated(since = "4.16.0")
public void setDataSetCallback(DataSetCallback dataSetCallback) {
this.dataSetCallback = dataSetCallback;
}
@@ -149,4 +171,96 @@ public class FlinkEndpoint extends DefaultEndpoint {
public void setCollect(boolean collect) {
this.collect = collect;
}
+
+ public String getExecutionMode() {
+ return executionMode;
+ }
+
+ /**
+ * Execution mode for the Flink job. Options: STREAMING (default), BATCH,
AUTOMATIC. BATCH mode is recommended for
+ * bounded streams (batch processing).
+ */
+ public void setExecutionMode(String executionMode) {
+ this.executionMode = executionMode;
+ }
+
+ public Long getCheckpointInterval() {
+ return checkpointInterval;
+ }
+
+ /**
+ * Interval in milliseconds between checkpoints. Enables checkpointing
when set. Recommended for streaming jobs to
+ * ensure fault tolerance.
+ */
+ public void setCheckpointInterval(Long checkpointInterval) {
+ this.checkpointInterval = checkpointInterval;
+ }
+
+ public String getCheckpointingMode() {
+ return checkpointingMode;
+ }
+
+ /**
+ * Checkpointing mode: EXACTLY_ONCE (default) or AT_LEAST_ONCE.
EXACTLY_ONCE provides stronger guarantees but may
+ * have higher overhead.
+ */
+ public void setCheckpointingMode(String checkpointingMode) {
+ this.checkpointingMode = checkpointingMode;
+ }
+
+ public Integer getParallelism() {
+ return parallelism;
+ }
+
+ /**
+ * Parallelism for the Flink job. If not set, uses the default parallelism
of the execution environment.
+ */
+ public void setParallelism(Integer parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ public Integer getMaxParallelism() {
+ return maxParallelism;
+ }
+
+ /**
+ * Maximum parallelism for the Flink job. Defines the upper bound for
dynamic scaling and the number of key groups
+ * for stateful operators.
+ */
+ public void setMaxParallelism(Integer maxParallelism) {
+ this.maxParallelism = maxParallelism;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ /**
+ * Name for the Flink job. Useful for identification in the Flink UI and
logs.
+ */
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public Long getCheckpointTimeout() {
+ return checkpointTimeout;
+ }
+
+ /**
+ * Timeout in milliseconds for checkpoints. Checkpoints that take longer
will be aborted.
+ */
+ public void setCheckpointTimeout(Long checkpointTimeout) {
+ this.checkpointTimeout = checkpointTimeout;
+ }
+
+ public Long getMinPauseBetweenCheckpoints() {
+ return minPauseBetweenCheckpoints;
+ }
+
+ /**
+ * Minimum pause in milliseconds between consecutive checkpoints. Helps
prevent checkpoint storms under heavy load.
+ */
+ public void setMinPauseBetweenCheckpoints(Long minPauseBetweenCheckpoints)
{
+ this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+ }
}
diff --git
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java
index 250ea1b3d95e..0230da4a92ae 100644
---
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java
+++
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java
@@ -18,6 +18,14 @@ package org.apache.camel.component.flink;
import org.apache.flink.api.java.DataSet;
+/**
+ * Void implementation of DataSetCallback for operations that don't return
results.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the
DataStream API with bounded streams instead. See
+ * the Flink migration guide for details on migrating from DataSet
to DataStream API. This class will be
+ * maintained for backward compatibility but may be removed in
future versions.
+ */
+@Deprecated(since = "4.16.0")
public abstract class VoidDataSetCallback implements DataSetCallback<Void> {
public abstract void doOnDataSet(DataSet ds, Object... payloads);
diff --git
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java
index bcd1cc1e6dc0..885427ff5a78 100644
---
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java
+++
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java
@@ -32,7 +32,12 @@ import static
org.apache.camel.util.ObjectHelper.findMethodsWithAnnotation;
/**
* Provides facade for working with annotated DataSet callbacks i.e. POJO
classes with an appropriate annotations on
* selected methods.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the
DataStream API with bounded streams instead. See
+ * the Flink migration guide for details on migrating from DataSet
to DataStream API. This class will be
+ * maintained for backward compatibility but may be removed in
future versions.
*/
+@Deprecated(since = "4.16.0")
public class AnnotatedDataSetCallback implements
org.apache.camel.component.flink.DataSetCallback {
private final Object objectWithCallback;
diff --git
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java
index 2f02ce20e6c6..ad177103fe16 100644
---
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java
+++
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java
@@ -22,6 +22,14 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+/**
+ * Annotation for marking methods as DataSet callbacks.
+ *
+ * @deprecated The DataSet API is deprecated since Flink 1.12. Use the
DataStream API with bounded streams instead. See
+ * the Flink migration guide for details on migrating from DataSet
to DataStream API. This annotation will
+ * be maintained for backward compatibility but may be removed in
future versions.
+ */
+@Deprecated(since = "4.16.0")
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD, ElementType.PARAMETER })
@Inherited
diff --git
a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamBatchProcessingIT.java
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamBatchProcessingIT.java
new file mode 100644
index 000000000000..28c40445dec2
--- /dev/null
+++
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamBatchProcessingIT.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.flink;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for DataStream batch processing with various
configurations. Tests end-to-end scenarios with actual
+ * data transformations.
+ */
+public class DataStreamBatchProcessingIT extends CamelTestSupport {
+
+ // Create separate environments for isolation
+ StreamExecutionEnvironment batchEnv =
Flinks.createStreamExecutionEnvironment();
+ StreamExecutionEnvironment transformEnv =
Flinks.createStreamExecutionEnvironment();
+
+ @BindToRegistry("numberStream")
+ private DataStreamSource<Integer> numberStream = batchEnv.fromElements(1,
2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+ @BindToRegistry("textStream")
+ private DataStreamSource<String> textStream
+ = transformEnv.fromElements("apache", "camel", "flink",
"integration", "test");
+
+ @BindToRegistry("multiplyCallback")
+ public DataStreamCallback multiplyCallback() {
+ return new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object... payloads)
throws Exception {
+ int multiplier = (Integer) payloads[0];
+ ds.map((MapFunction<Integer, Integer>) value -> value *
multiplier)
+ .print();
+ }
+ };
+ }
+
+ @Test
+ public void testBatchProcessingWithTransformation() {
+ // Verify that the callback executes without error and the
transformation is set up
+ template.sendBodyAndHeader(
+ "direct:batchTransform",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // Verify environment is configured with batch mode
and parallelism=2
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+
Assertions.assertThat(env.getParallelism()).isEqualTo(2);
+
+ // Set up transformation (won't execute in test
context)
+ ds.map((MapFunction<Integer, Integer>) value -> value
* 2).print();
+ }
+ });
+ }
+
+ @Test
+ public void testBatchProcessingWithPayload() {
+ List<Integer> results = new ArrayList<>();
+
+ template.sendBodyAndHeader(
+ "direct:withPayload",
+ 3, // multiplier
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ Assertions.assertThat(payloads).hasSize(1);
+ int multiplier = (Integer) payloads[0];
+ Assertions.assertThat(multiplier).isEqualTo(3);
+
+ ds.map((MapFunction<Integer, Integer>) value -> value
* multiplier)
+ .print();
+ }
+ });
+ }
+
+ @Test
+ public void testBatchProcessingWithFilter() {
+ // Verify filter operation can be set up
+ template.sendBodyAndHeader(
+ "direct:batchFilter",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // Verify environment configuration
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+
Assertions.assertThat(env.getParallelism()).isEqualTo(1);
+
+ // Set up filter (won't execute in test context)
+ ds.filter(value -> ((Integer) value) % 2 == 0).print();
+ }
+ });
+ }
+
+ @Test
+ public void testStringProcessingWithBatchMode() {
+ // Verify string transformation can be set up
+ template.sendBodyAndHeader(
+ "direct:stringTransform",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // Verify environment configuration
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+
Assertions.assertThat(env.getParallelism()).isEqualTo(3);
+
+ // Set up transformation
+ ds.map((MapFunction<String, String>)
String::toUpperCase).print();
+ }
+ });
+ }
+
+ @Test
+ public void testHighParallelismProcessing() {
+ // Verify high parallelism configuration
+ template.sendBodyAndHeader(
+ "direct:highParallelism",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+
+ // Verify high parallelism was set
+
Assertions.assertThat(env.getParallelism()).isEqualTo(16);
+
Assertions.assertThat(env.getMaxParallelism()).isEqualTo(256);
+
+ // Set up transformation
+ ds.map((MapFunction<Integer, Integer>) value -> value
* value).print();
+ }
+ });
+ }
+
+ @Test
+ public void testCallbackFromRegistry() {
+ // Track that the callback was actually invoked
+ final boolean[] callbackInvoked = { false };
+
+ // Send body with multiplier and verify callback executes
+ template.sendBodyAndHeader(
+ "direct:registryCallback",
+ 5,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // Verify the callback is invoked
+ callbackInvoked[0] = true;
+
+ // Verify environment configuration
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+
Assertions.assertThat(env.getParallelism()).isEqualTo(4);
+
+ // Verify payload was passed correctly
+ Assertions.assertThat(payloads).hasSize(1);
+ Assertions.assertThat(payloads[0]).isEqualTo(5);
+
+ // Set up the transformation (using the registry
callback pattern)
+ ds.map((MapFunction<Integer, Integer>) value -> value
* (Integer) payloads[0]).print();
+ }
+ });
+
+ // Verify callback was executed
+ Assertions.assertThat(callbackInvoked[0]).isTrue();
+ }
+
+ @Test
+ public void testMultipleOperations() {
+ // Verify chained operations can be set up
+ template.sendBodyAndHeader(
+ "direct:multipleOps",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // Verify environment configuration
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+
Assertions.assertThat(env.getParallelism()).isEqualTo(4);
+
+ // Set up chained operations
+ ds.filter(value -> ((Integer) value) > 3)
+ .map((MapFunction<Integer, Integer>) value ->
value * 10)
+ .map((MapFunction<Integer, Integer>) value ->
value + 5)
+ .print();
+ }
+ });
+ }
+
+ @Test
+ public void testConfigurationPersistsAcrossInvocations() {
+ // First invocation
+ template.sendBodyAndHeader(
+ "direct:batchTransform",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+
Assertions.assertThat(env.getParallelism()).isEqualTo(2);
+ }
+ });
+
+ // Second invocation - should have same configuration
+ template.sendBodyAndHeader(
+ "direct:batchTransform",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+
Assertions.assertThat(env.getParallelism()).isEqualTo(2);
+ }
+ });
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:batchTransform")
+ .to("flink:datastream?dataStream=#numberStream"
+ + "&executionMode=BATCH"
+ + "¶llelism=2");
+
+ from("direct:withPayload")
+ .to("flink:datastream?dataStream=#numberStream"
+ + "&executionMode=BATCH");
+
+ from("direct:batchFilter")
+ .to("flink:datastream?dataStream=#numberStream"
+ + "&executionMode=BATCH"
+ + "¶llelism=1");
+
+ from("direct:stringTransform")
+ .to("flink:datastream?dataStream=#textStream"
+ + "&executionMode=BATCH"
+ + "¶llelism=3");
+
+ from("direct:highParallelism")
+ .to("flink:datastream?dataStream=#numberStream"
+ + "&executionMode=BATCH"
+ + "¶llelism=16"
+ + "&maxParallelism=256");
+
+ from("direct:registryCallback")
+ .to("flink:datastream?dataStream=#numberStream"
+ + "&dataStreamCallback=#multiplyCallback"
+ + "&executionMode=BATCH"
+ + "¶llelism=4");
+
+ from("direct:multipleOps")
+ .to("flink:datastream?dataStream=#numberStream"
+ + "&executionMode=BATCH"
+ + "¶llelism=4");
+ }
+ };
+ }
+}
diff --git
a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamConfigurationIT.java
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamConfigurationIT.java
new file mode 100644
index 000000000000..cb25c080f640
--- /dev/null
+++
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamConfigurationIT.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.flink;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for DataStream producer configuration options including
execution mode, checkpointing, and
+ * parallelism settings.
+ */
+public class DataStreamConfigurationIT extends CamelTestSupport {
+
+ @BindToRegistry("batchDataStream")
+ public DataStreamSource<String> createBatchDataStream() {
+ return Flinks.createStreamExecutionEnvironment().fromElements("test1",
"test2", "test3");
+ }
+
+ @BindToRegistry("streamingDataStream")
+ public DataStreamSource<String> createStreamingDataStream() {
+ return
Flinks.createStreamExecutionEnvironment().fromElements("stream1", "stream2");
+ }
+
+ @BindToRegistry("checkpointDataStream")
+ public DataStreamSource<String> createCheckpointDataStream() {
+ return Flinks.createStreamExecutionEnvironment().fromElements("data1",
"data2");
+ }
+
+ @BindToRegistry("parallelismDataStream")
+ public DataStreamSource<String> createParallelismDataStream() {
+ return
Flinks.createStreamExecutionEnvironment().fromElements("parallel1",
"parallel2");
+ }
+
+ @BindToRegistry("fullConfigDataStream")
+ public DataStreamSource<String> createFullConfigDataStream() {
+ return
Flinks.createStreamExecutionEnvironment().fromElements("config1", "config2");
+ }
+
+ @BindToRegistry("captureEnvCallback")
+ public DataStreamCallback captureEnvCallback() {
+ return new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object... payloads)
throws Exception {
+ // Just capture the environment for testing
+ ds.print();
+ }
+ };
+ }
+
+ @Test
+ public void testBatchExecutionModeConfiguration() {
+ AtomicReference<StreamExecutionEnvironment> envRef = new
AtomicReference<>();
+
+ template.sendBodyAndHeader(
+ "direct:batchMode",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+ envRef.set(env);
+ }
+ });
+
+ StreamExecutionEnvironment env = envRef.get();
+ Assertions.assertThat(env).isNotNull();
+
+ // Verify BATCH mode was set
+ RuntimeExecutionMode mode = env.getConfiguration()
+
.get(org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE);
+ Assertions.assertThat(mode).isEqualTo(RuntimeExecutionMode.BATCH);
+ }
+
+ @Test
+ public void testStreamingExecutionModeConfiguration() {
+ AtomicReference<StreamExecutionEnvironment> envRef = new
AtomicReference<>();
+
+ template.sendBodyAndHeader(
+ "direct:streamingMode",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+ envRef.set(env);
+ }
+ });
+
+ StreamExecutionEnvironment env = envRef.get();
+ Assertions.assertThat(env).isNotNull();
+
+ // Verify STREAMING mode was set
+ RuntimeExecutionMode mode = env.getConfiguration()
+
.get(org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE);
+ Assertions.assertThat(mode).isEqualTo(RuntimeExecutionMode.STREAMING);
+ }
+
+ @Test
+ public void testCheckpointingConfiguration() {
+ AtomicReference<CheckpointConfig> checkpointConfigRef = new
AtomicReference<>();
+
+ template.sendBodyAndHeader(
+ "direct:checkpointing",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+ checkpointConfigRef.set(env.getCheckpointConfig());
+ }
+ });
+
+ CheckpointConfig config = checkpointConfigRef.get();
+ Assertions.assertThat(config).isNotNull();
+
+ // Verify checkpoint interval
+ Assertions.assertThat(config.getCheckpointInterval()).isEqualTo(5000L);
+
+ // Verify checkpointing mode
+
Assertions.assertThat(config.getCheckpointingMode()).isEqualTo(CheckpointingMode.EXACTLY_ONCE);
+
+ // Verify checkpoint timeout
+ Assertions.assertThat(config.getCheckpointTimeout()).isEqualTo(30000L);
+
+ // Verify min pause between checkpoints
+
Assertions.assertThat(config.getMinPauseBetweenCheckpoints()).isEqualTo(2000L);
+ }
+
+ @Test
+ public void testParallelismConfiguration() {
+ AtomicReference<StreamExecutionEnvironment> envRef = new
AtomicReference<>();
+
+ template.sendBodyAndHeader(
+ "direct:parallelism",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+ envRef.set(env);
+ }
+ });
+
+ StreamExecutionEnvironment env = envRef.get();
+ Assertions.assertThat(env).isNotNull();
+
+ // Verify parallelism settings
+ Assertions.assertThat(env.getParallelism()).isEqualTo(4);
+ Assertions.assertThat(env.getMaxParallelism()).isEqualTo(64);
+ }
+
+ @Test
+ public void testFullConfiguration() {
+ AtomicReference<StreamExecutionEnvironment> envRef = new
AtomicReference<>();
+ AtomicReference<CheckpointConfig> checkpointConfigRef = new
AtomicReference<>();
+
+ template.sendBodyAndHeader(
+ "direct:fullConfig",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+ envRef.set(env);
+ checkpointConfigRef.set(env.getCheckpointConfig());
+ }
+ });
+
+ StreamExecutionEnvironment env = envRef.get();
+ CheckpointConfig checkpointConfig = checkpointConfigRef.get();
+
+ Assertions.assertThat(env).isNotNull();
+ Assertions.assertThat(checkpointConfig).isNotNull();
+
+ // Verify execution mode
+ RuntimeExecutionMode mode = env.getConfiguration()
+
.get(org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE);
+ Assertions.assertThat(mode).isEqualTo(RuntimeExecutionMode.STREAMING);
+
+ // Verify parallelism
+ Assertions.assertThat(env.getParallelism()).isEqualTo(8);
+ Assertions.assertThat(env.getMaxParallelism()).isEqualTo(128);
+
+ // Verify checkpointing
+
Assertions.assertThat(checkpointConfig.getCheckpointInterval()).isEqualTo(10000L);
+
Assertions.assertThat(checkpointConfig.getCheckpointingMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+
Assertions.assertThat(checkpointConfig.getCheckpointTimeout()).isEqualTo(60000L);
+
Assertions.assertThat(checkpointConfig.getMinPauseBetweenCheckpoints()).isEqualTo(5000L);
+ }
+
+ @Test
+ public void testInvalidExecutionModeHandling() {
+ // Should not throw exception, just log warning
+ template.sendBodyAndHeader(
+ "direct:invalidMode",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // Should execute without error despite invalid mode
+ Assertions.assertThat(ds).isNotNull();
+ }
+ });
+ }
+
+ @Test
+ public void testInvalidCheckpointingModeHandling() {
+ // Should not throw exception, just log warning
+ template.sendBodyAndHeader(
+ "direct:invalidCheckpointMode",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // Should execute without error despite invalid
checkpoint mode
+ Assertions.assertThat(ds).isNotNull();
+ }
+ });
+ }
+
+ @Test
+ public void testConfigurationViaRouteParameters() {
+ AtomicReference<StreamExecutionEnvironment> envRef = new
AtomicReference<>();
+
+ template.sendBodyAndHeader(
+ "direct:routeConfig",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+ envRef.set(env);
+ }
+ });
+
+ StreamExecutionEnvironment env = envRef.get();
+ Assertions.assertThat(env).isNotNull();
+
+ // Verify the configuration was applied via route parameters
+ Assertions.assertThat(env.getParallelism()).isEqualTo(2);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:batchMode")
+ .to("flink:datastream?dataStream=#batchDataStream"
+ + "&executionMode=BATCH");
+
+ from("direct:streamingMode")
+ .to("flink:datastream?dataStream=#streamingDataStream"
+ + "&executionMode=STREAMING");
+
+ from("direct:checkpointing")
+ .to("flink:datastream?dataStream=#checkpointDataStream"
+ + "&checkpointInterval=5000"
+ + "&checkpointingMode=EXACTLY_ONCE"
+ + "&checkpointTimeout=30000"
+ + "&minPauseBetweenCheckpoints=2000");
+
+ from("direct:parallelism")
+
.to("flink:datastream?dataStream=#parallelismDataStream"
+ + "¶llelism=4"
+ + "&maxParallelism=64");
+
+ from("direct:fullConfig")
+ .to("flink:datastream?dataStream=#fullConfigDataStream"
+ + "&executionMode=STREAMING"
+ + "&checkpointInterval=10000"
+ + "&checkpointingMode=AT_LEAST_ONCE"
+ + "&checkpointTimeout=60000"
+ + "&minPauseBetweenCheckpoints=5000"
+ + "¶llelism=8"
+ + "&maxParallelism=128"
+ + "&jobName=FullConfigTest");
+
+ from("direct:invalidMode")
+ .to("flink:datastream?dataStream=#batchDataStream"
+ + "&executionMode=INVALID_MODE"
+ + "&dataStreamCallback=#captureEnvCallback");
+
+ from("direct:invalidCheckpointMode")
+ .to("flink:datastream?dataStream=#checkpointDataStream"
+ + "&checkpointInterval=5000"
+ + "&checkpointingMode=INVALID_CHECKPOINT_MODE"
+ + "&dataStreamCallback=#captureEnvCallback");
+
+ from("direct:routeConfig")
+
.to("flink:datastream?dataStream=#parallelismDataStream"
+ + "¶llelism=2");
+ }
+ };
+ }
+}
diff --git
a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamEdgeCasesIT.java
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamEdgeCasesIT.java
new file mode 100644
index 000000000000..9bcef953c825
--- /dev/null
+++
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamEdgeCasesIT.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.flink;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for edge cases and error handling in DataStream
configuration.
+ */
+public class DataStreamEdgeCasesIT extends CamelTestSupport {
+
+ // Create separate environment for isolation
+ StreamExecutionEnvironment testEnv =
Flinks.createStreamExecutionEnvironment();
+
+ @BindToRegistry("testDataStream")
+ private DataStreamSource<String> testDs = testEnv.fromElements("test1",
"test2");
+
+ @Test
+ public void testMissingDataStreamThrowsException() {
+ // Should throw exception when no DataStream is defined
+ Assertions.assertThatThrownBy(() -> {
+ template.sendBodyAndHeader(
+ "direct:noDataStream",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ ds.print();
+ }
+ });
+ }).isInstanceOf(CamelExecutionException.class)
+ .cause()
+ .hasMessageContaining("No DataStream defined");
+ }
+
+ @Test
+ public void testMissingCallbackThrowsException() {
+ // Should throw exception when no callback is defined
+ Assertions.assertThatThrownBy(() -> {
+ template.sendBody("direct:noCallback", null);
+ }).isInstanceOf(CamelExecutionException.class)
+ .cause()
+ .hasMessageContaining("Cannot resolve DataStream callback");
+ }
+
+ @Test
+ public void testInvalidParallelismHandling() {
+ // Flink will reject parallelism of 0 or negative values during
configuration
+ // This test verifies that the error is clear
+ Assertions.assertThatThrownBy(() -> {
+ StreamExecutionEnvironment invalidEnv =
Flinks.createStreamExecutionEnvironment();
+ invalidEnv.setParallelism(0);
+ }).isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Parallelism must be at least one");
+ }
+
+ @Test
+ public void testCheckpointWithoutIntervalIgnored() {
+ template.sendBodyAndHeader(
+ "direct:checkpointNoInterval",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+ // Checkpointing mode is set but interval is not, so
checkpointing won't be enabled
+ Assertions.assertThat(env).isNotNull();
+ }
+ });
+ }
+
+ @Test
+ public void testNullPayloadsHandling() {
+ template.sendBodyAndHeader(
+ "direct:withDataStream",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // Null body results in single null payload
+ Assertions.assertThat(payloads).hasSize(1);
+ Assertions.assertThat(payloads[0]).isNull();
+ }
+ });
+ }
+
+ @Test
+ public void testEmptyListPayload() {
+ template.sendBodyAndHeader(
+ "direct:withDataStream",
+ java.util.Collections.emptyList(),
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // Empty list should result in empty array
+ Assertions.assertThat(payloads).isEmpty();
+ }
+ });
+ }
+
+ @Test
+ public void testVeryHighParallelism() {
+ template.sendBodyAndHeader(
+ "direct:veryHighParallelism",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+ // Very high parallelism should be accepted (Flink
will validate)
+
Assertions.assertThat(env.getParallelism()).isEqualTo(1000);
+ }
+ });
+ }
+
+ @Test
+ public void testAutomaticExecutionMode() {
+ template.sendBodyAndHeader(
+ "direct:automaticMode",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // AUTOMATIC mode should be accepted
+ Assertions.assertThat(ds).isNotNull();
+ }
+ });
+ }
+
+ @Test
+ public void testVeryShortCheckpointInterval() {
+ template.sendBodyAndHeader(
+ "direct:shortCheckpoint",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ StreamExecutionEnvironment env =
ds.getExecutionEnvironment();
+ // Very short interval (100ms) should be configured,
though not recommended
+
Assertions.assertThat(env.getCheckpointConfig().getCheckpointInterval()).isEqualTo(100L);
+ }
+ });
+ }
+
+ @Test
+ public void testCallbackExceptionPropagation() {
+ Assertions.assertThatThrownBy(() -> {
+ template.sendBodyAndHeader(
+ "direct:withDataStream",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ throw new RuntimeException("Test exception from
callback");
+ }
+ });
+ }).isInstanceOf(CamelExecutionException.class)
+ .cause()
+ .hasMessageContaining("Test exception from callback");
+ }
+
+ @Test
+ public void testHeaderOverridesEndpointCallback() {
+ // When both endpoint callback and header callback are present, header
should win
+ boolean[] headerCallbackCalled = { false };
+
+ template.sendBodyAndHeader(
+ "direct:withEndpointCallback",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ headerCallbackCalled[0] = true;
+ }
+ });
+
+ Assertions.assertThat(headerCallbackCalled[0]).isTrue();
+ }
+
+ @Test
+ public void testConfigurationWithAllNullOptionals() {
+ // All optional configuration parameters are null - should use defaults
+ template.sendBodyAndHeader(
+ "direct:minimalConfig",
+ null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // Should work with defaults
+ Assertions.assertThat(ds).isNotNull();
+ }
+ });
+ }
+
+ @BindToRegistry("endpointCallback")
+ public DataStreamCallback endpointCallback() {
+ return new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object... payloads)
throws Exception {
+ throw new AssertionError("Endpoint callback should not be
called when header is present");
+ }
+ };
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:noDataStream")
+ .to("flink:datastream");
+
+ from("direct:noCallback")
+ .to("flink:datastream?dataStream=#testDataStream");
+
+ from("direct:checkpointNoInterval")
+ .to("flink:datastream?dataStream=#testDataStream"
+ + "&checkpointingMode=EXACTLY_ONCE");
+
+ from("direct:withDataStream")
+ .to("flink:datastream?dataStream=#testDataStream");
+
+ from("direct:veryHighParallelism")
+ .to("flink:datastream?dataStream=#testDataStream"
+ + "¶llelism=1000"
+ + "&maxParallelism=2000");
+
+ from("direct:automaticMode")
+ .to("flink:datastream?dataStream=#testDataStream"
+ + "&executionMode=AUTOMATIC");
+
+ from("direct:shortCheckpoint")
+ .to("flink:datastream?dataStream=#testDataStream"
+ + "&checkpointInterval=100");
+
+ from("direct:withEndpointCallback")
+ .to("flink:datastream?dataStream=#testDataStream"
+ + "&dataStreamCallback=#endpointCallback");
+
+ from("direct:minimalConfig")
+ .to("flink:datastream?dataStream=#testDataStream");
+ }
+ };
+ }
+}
diff --git
a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamProducerTest.java
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamProducerTest.java
new file mode 100644
index 000000000000..f47230c46bc9
--- /dev/null
+++
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamProducerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.flink;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class DataStreamProducerTest extends CamelTestSupport {
+
+ static StreamExecutionEnvironment streamExecutionEnvironment =
Flinks.createStreamExecutionEnvironment();
+
+ String flinkDataStreamUri = "flink:dataStream?dataStream=#myDataStream";
+
+ @BindToRegistry("myDataStream")
+ private DataStreamSource<String> dss =
streamExecutionEnvironment.readTextFile("src/test/resources/testds.txt");
+
+ @Test
+ public void shouldExecuteDataStreamCallback() {
+ template.sendBodyAndHeader(flinkDataStreamUri, null,
FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ // Just verify the callback is executed
+ ds.print();
+ }
+ });
+ }
+
+ @Test
+ public void shouldExecuteDataStreamCallbackWithPayload() {
+ template.sendBodyAndHeader(flinkDataStreamUri, "test-payload",
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ Assertions.assertThat(payloads).hasSize(1);
+
Assertions.assertThat(payloads[0]).isEqualTo("test-payload");
+ }
+ });
+ }
+
+ @Test
+ public void shouldExecuteDataStreamCallbackWithMultiplePayloads() {
+ List<String> payloads = Arrays.asList("payload1", "payload2",
"payload3");
+ template.sendBodyAndHeader(flinkDataStreamUri, payloads,
FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream ds, Object...
payloads) throws Exception {
+ Assertions.assertThat(payloads).hasSize(3);
+
Assertions.assertThat(payloads[0]).isEqualTo("payload1");
+
Assertions.assertThat(payloads[1]).isEqualTo("payload2");
+
Assertions.assertThat(payloads[2]).isEqualTo("payload3");
+ }
+ });
+ }
+
+ @Test
+ public void shouldConfigureExecutionMode() {
+ StreamExecutionEnvironment env = streamExecutionEnvironment;
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ Assertions.assertThat(env.getConfiguration().get(
+ org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE))
+ .isEqualTo(RuntimeExecutionMode.BATCH);
+ }
+
+ @Test
+ public void shouldConfigureCheckpointing() {
+ StreamExecutionEnvironment env =
Flinks.createStreamExecutionEnvironment();
+ env.enableCheckpointing(5000);
+
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
+
Assertions.assertThat(env.getCheckpointConfig().getCheckpointInterval()).isEqualTo(5000);
+ Assertions.assertThat(env.getCheckpointConfig().getCheckpointingMode())
+ .isEqualTo(CheckpointingMode.EXACTLY_ONCE);
+ }
+
+ @Test
+ public void shouldConfigureParallelism() {
+ StreamExecutionEnvironment env =
Flinks.createStreamExecutionEnvironment();
+ env.setParallelism(4);
+
+ Assertions.assertThat(env.getParallelism()).isEqualTo(4);
+ }
+
+ @Test
+ public void shouldConfigureMaxParallelism() {
+ StreamExecutionEnvironment env =
Flinks.createStreamExecutionEnvironment();
+ env.setMaxParallelism(128);
+
+ Assertions.assertThat(env.getMaxParallelism()).isEqualTo(128);
+ }
+}
diff --git
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc
index 20047a339852..59049c6546d6 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc
@@ -94,4 +94,151 @@ item.setDataValueListener(dataValue ->
processValue(dataValue));
item.setDataValueListener((monitoredItem, dataValue) ->
processValue(dataValue));
----
-NOTE: Most users will not be affected by these changes as they primarily
affect advanced use cases where you directly interact with the Milo API.
Standard camel-milo endpoint configurations remain unchanged.
\ No newline at end of file
+=== camel-flink
+
+Apache Flink deprecated the DataSet API in version 1.12 in favor of a unified
DataStream API that handles both
+streaming and batch processing. The DataStream API with bounded streams
provides all the functionality of the
+DataSet API and more, with better performance and a unified programming model.
+
+==== Key Differences
+
+[cols="1,1,1", options="header"]
+|===
+|Aspect |DataSet API |DataStream API (Batch Mode)
+
+|Execution
+|Immediate (lazy evaluation)
+|Event-driven (requires explicit execution)
+
+|Data Type
+|Bounded datasets
+|Bounded or unbounded streams
+
+|Time Semantics
+|Not applicable
+|Event time, processing time, ingestion time
+
+|State Management
+|Limited
+|Full support for keyed and operator state
+
+|Windowing
+|Not applicable
+|Full windowing support
+|===
+
+==== Migration Guide
+
+===== Update Endpoint Type
+
+Replace `flink:dataset` with `flink:datastream`:
+
+.Before
+[source,java]
+-----------------------------------
+from("direct:start")
+ .to("flink:dataset?dataSet=#myDataSet&dataSetCallback=#myCallback");
+-----------------------------------
+
+.After
+[source,java]
+-----------------------------------
+from("direct:start")
+
.to("flink:datastream?dataStream=#myDataStream&dataStreamCallback=#myCallback");
+-----------------------------------
+
+===== Configure Batch Execution Mode
+
+For batch processing with DataStream API, configure the execution environment
for batch mode:
+
+[source,java]
+-----------------------------------
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+// Set to batch mode for bounded streams
+env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+-----------------------------------
+
+===== Update Data Sources
+
+.Before (DataSet API)
+[source,java]
+-----------------------------------
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+DataSet<String> dataSet = env.readTextFile("input.txt");
+-----------------------------------
+
+.After (DataStream API)
+[source,java]
+-----------------------------------
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+DataStream<String> dataStream = env.readTextFile("input.txt");
+-----------------------------------
+
+===== Update Transformations
+
+Most transformation operations have direct equivalents:
+
+[cols="1,1", options="header"]
+|===
+|DataSet API |DataStream API
+
+|`map()`
+|`map()`
+
+|`flatMap()`
+|`flatMap()`
+
+|`filter()`
+|`filter()`
+
+|`reduce()`
+|`reduce()` or `keyBy().reduce()`
+
+|`groupBy()`
+|`keyBy()`
+
+|`join()`
+|`join()` (with windowing)
+
+|`coGroup()`
+|`coGroup()` (with windowing)
+|===
+
+===== Update Callbacks
+
+Replace `DataSetCallback` with `DataStreamCallback`:
+
+.Before (DataSet)
+[source,java]
+-----------------------------------
+@Bean
+public DataSetCallback<Long> dataSetCallback() {
+ return new DataSetCallback<Long>() {
+ public Long onDataSet(DataSet dataSet, Object... objects) {
+ try {
+ return dataSet.count();
+ } catch (Exception e) {
+ return -1L;
+ }
+ }
+ };
+}
+-----------------------------------
+
+.After (DataStream)
+[source,java]
+-----------------------------------
+@Bean
+public DataStreamCallback dataStreamCallback() {
+ return new DataStreamCallback() {
+ public Object onDataStream(DataStream dataStream, Object... objects) {
+ // For batch mode, ensure runtime mode is set
+ dataStream.print();
+ return null;
+ }
+ };
+}
+-----------------------------------
+
+NOTE: Most users will not be affected by these changes as they primarily
affect advanced use cases where you directly interact with the Milo API.
Standard camel-milo endpoint configurations remain unchanged.
diff --git
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/FlinkComponentBuilderFactory.java
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/FlinkComponentBuilderFactory.java
index 897e3dc0bb2d..7ec9ea2c6c5f 100644
---
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/FlinkComponentBuilderFactory.java
+++
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/FlinkComponentBuilderFactory.java
@@ -61,6 +61,7 @@ public interface FlinkComponentBuilderFactory {
* @param dataSetCallback the value to set
* @return the dsl builder
*/
+ @Deprecated
default FlinkComponentBuilder
dataSetCallback(org.apache.camel.component.flink.DataSetCallback
dataSetCallback) {
doSetProperty("dataSetCallback", dataSetCallback);
return this;
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FlinkEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FlinkEndpointBuilderFactory.java
index 11a4d5f42bee..270771d8568c 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FlinkEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FlinkEndpointBuilderFactory.java
@@ -209,6 +209,113 @@ public interface FlinkEndpointBuilderFactory {
return (FlinkEndpointBuilder) this;
}
+ /**
+ * Checkpointing mode: EXACTLY_ONCE (default) or AT_LEAST_ONCE.
+ * EXACTLY_ONCE provides stronger guarantees but may have higher
+ * overhead.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param checkpointingMode the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder checkpointingMode(String
checkpointingMode) {
+ doSetProperty("checkpointingMode", checkpointingMode);
+ return this;
+ }
+ /**
+ * Interval in milliseconds between checkpoints. Enables checkpointing
+ * when set. Recommended for streaming jobs to ensure fault tolerance.
+ *
+ * The option is a: <code>java.lang.Long</code> type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param checkpointInterval the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder checkpointInterval(Long
checkpointInterval) {
+ doSetProperty("checkpointInterval", checkpointInterval);
+ return this;
+ }
+ /**
+ * Interval in milliseconds between checkpoints. Enables checkpointing
+ * when set. Recommended for streaming jobs to ensure fault tolerance.
+ *
+ * The option will be converted to a <code>java.lang.Long</code> type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param checkpointInterval the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder checkpointInterval(String
checkpointInterval) {
+ doSetProperty("checkpointInterval", checkpointInterval);
+ return this;
+ }
+ /**
+ * Timeout in milliseconds for checkpoints. Checkpoints that take
longer
+ * will be aborted.
+ *
+ * The option is a: <code>java.lang.Long</code> type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param checkpointTimeout the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder checkpointTimeout(Long
checkpointTimeout) {
+ doSetProperty("checkpointTimeout", checkpointTimeout);
+ return this;
+ }
+ /**
+ * Timeout in milliseconds for checkpoints. Checkpoints that take
longer
+ * will be aborted.
+ *
+ * The option will be converted to a <code>java.lang.Long</code> type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param checkpointTimeout the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder checkpointTimeout(String
checkpointTimeout) {
+ doSetProperty("checkpointTimeout", checkpointTimeout);
+ return this;
+ }
+ /**
+ * Execution mode for the Flink job. Options: STREAMING (default),
+ * BATCH, AUTOMATIC. BATCH mode is recommended for bounded streams
+ * (batch processing).
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param executionMode the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder executionMode(String
executionMode) {
+ doSetProperty("executionMode", executionMode);
+ return this;
+ }
+ /**
+ * Name for the Flink job. Useful for identification in the Flink UI
and
+ * logs.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param jobName the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder jobName(String jobName) {
+ doSetProperty("jobName", jobName);
+ return this;
+ }
/**
* Whether the producer should be started lazy (on the first message).
* By starting lazy you can use this to allow CamelContext and routes
to
@@ -255,6 +362,98 @@ public interface FlinkEndpointBuilderFactory {
doSetProperty("lazyStartProducer", lazyStartProducer);
return this;
}
+ /**
+ * Maximum parallelism for the Flink job. Defines the upper bound for
+ * dynamic scaling and the number of key groups for stateful operators.
+ *
+ * The option is a: <code>java.lang.Integer</code> type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param maxParallelism the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder maxParallelism(Integer
maxParallelism) {
+ doSetProperty("maxParallelism", maxParallelism);
+ return this;
+ }
+ /**
+ * Maximum parallelism for the Flink job. Defines the upper bound for
+ * dynamic scaling and the number of key groups for stateful operators.
+ *
+ * The option will be converted to a <code>java.lang.Integer</code>
+ * type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param maxParallelism the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder maxParallelism(String
maxParallelism) {
+ doSetProperty("maxParallelism", maxParallelism);
+ return this;
+ }
+ /**
+ * Minimum pause in milliseconds between consecutive checkpoints. Helps
+ * prevent checkpoint storms under heavy load.
+ *
+ * The option is a: <code>java.lang.Long</code> type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param minPauseBetweenCheckpoints the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder minPauseBetweenCheckpoints(Long
minPauseBetweenCheckpoints) {
+ doSetProperty("minPauseBetweenCheckpoints",
minPauseBetweenCheckpoints);
+ return this;
+ }
+ /**
+ * Minimum pause in milliseconds between consecutive checkpoints. Helps
+ * prevent checkpoint storms under heavy load.
+ *
+ * The option will be converted to a <code>java.lang.Long</code> type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param minPauseBetweenCheckpoints the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder minPauseBetweenCheckpoints(String
minPauseBetweenCheckpoints) {
+ doSetProperty("minPauseBetweenCheckpoints",
minPauseBetweenCheckpoints);
+ return this;
+ }
+ /**
+ * Parallelism for the Flink job. If not set, uses the default
+ * parallelism of the execution environment.
+ *
+ * The option is a: <code>java.lang.Integer</code> type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param parallelism the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder parallelism(Integer parallelism) {
+ doSetProperty("parallelism", parallelism);
+ return this;
+ }
+ /**
+ * Parallelism for the Flink job. If not set, uses the default
+ * parallelism of the execution environment.
+ *
+ * The option will be converted to a <code>java.lang.Integer</code>
+ * type.
+ *
+ * Group: producer (advanced)
+ *
+ * @param parallelism the value to set
+ * @return the dsl builder
+ */
+ default AdvancedFlinkEndpointBuilder parallelism(String parallelism) {
+ doSetProperty("parallelism", parallelism);
+ return this;
+ }
}
public interface FlinkBuilders {