[CAMEL-9869] Create Apache Flink Component

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/702db0c9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/702db0c9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/702db0c9

Branch: refs/heads/master
Commit: 702db0c9832b5549ecc188d99f91c946775e052f
Parents: 01ec93d
Author: Subhobrata Dey <sbc...@gmail.com>
Authored: Tue Apr 26 16:30:52 2016 -0400
Committer: Subhobrata Dey <sbc...@gmail.com>
Committed: Wed Apr 27 11:31:28 2016 -0400

----------------------------------------------------------------------
 components/camel-flink/pom.xml                  |  8 +-
 .../component/flink/DataStreamCallback.java     | 29 ++++++++
 .../flink/DataStreamFlinkProducer.java          | 78 ++++++++++++++++++++
 .../camel/component/flink/FlinkComponent.java   | 25 +++++++
 .../camel/component/flink/FlinkConstants.java   |  5 +-
 .../camel/component/flink/FlinkEndpoint.java    | 34 ++++++++-
 .../apache/camel/component/flink/Flinks.java    |  6 ++
 .../component/flink/VoidDataStreamCallback.java | 30 ++++++++
 .../component/flink/FlinkProducerTest.java      | 38 +++++++---
 9 files changed, 241 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-flink/pom.xml b/components/camel-flink/pom.xml
index f176dd4..4b7b2f3 100644
--- a/components/camel-flink/pom.xml
+++ b/components/camel-flink/pom.xml
@@ -37,7 +37,6 @@
   </properties>
 
   <dependencies>
-
     <!--camel-->
     <dependency>
       <groupId>org.apache.camel</groupId>
@@ -55,6 +54,13 @@
       <artifactId>flink-clients</artifactId>
       <version>${flink-version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java</artifactId>
+      <version>${flink-version}</version>
+    </dependency>
+
+    <!--flink-->
 
     <!-- scala -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamCallback.java
----------------------------------------------------------------------
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamCallback.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamCallback.java
new file mode 100644
index 0000000..3038695
--- /dev/null
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamCallback.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.flink;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Generic block of code with parameters which can be executed against 
DataStream and return results.
+ *
+ * @param <T> results type
+ */
+public interface DataStreamCallback<T> {
+
+    T onDataStream(DataStream ds, Object... payloads) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
new file mode 100644
index 0000000..bc1aa16
--- /dev/null
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.flink;
+
+import java.util.List;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+public class DataStreamFlinkProducer extends DefaultProducer {
+
+    public DataStreamFlinkProducer(FlinkEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        DataStream ds = resolveDataStream(exchange);
+        DataStreamCallback dataStreamCallback = 
resolveDataStreamCallback(exchange);
+        Object body = exchange.getIn().getBody();
+        Object result = body instanceof List ? 
dataStreamCallback.onDataStream(ds, ((List) body).toArray(new Object[0])) : 
dataStreamCallback.onDataStream(ds, body);
+        collectResults(exchange, result);
+    }
+
+    @Override
+    public FlinkEndpoint getEndpoint() {
+        return (FlinkEndpoint) super.getEndpoint();
+    }
+
+    protected void collectResults(Exchange exchange, Object result) {
+        if (result instanceof DataStream) {
+            DataStream dsResults = (DataStream) result;
+
+            if (getEndpoint().isCollect()) {
+                throw new IllegalArgumentException("collect mode not supported 
for Flink DataStreams.");
+            } else {
+                exchange.getIn().setBody(result);
+                
exchange.getIn().setHeader(FlinkConstants.FLINK_DATASTREAM_HEADER, result);
+            }
+        } else {
+            exchange.getIn().setBody(result);
+        }
+    }
+
+    protected DataStream resolveDataStream(Exchange exchange) {
+        if (exchange.getIn().getHeader(FlinkConstants.FLINK_DATASTREAM_HEADER) 
!= null) {
+            return (DataStream) 
exchange.getIn().getHeader(FlinkConstants.FLINK_DATASTREAM_HEADER);
+        } else if (getEndpoint().getDataStream() != null) {
+            return getEndpoint().getDataStream();
+        } else {
+            throw new IllegalArgumentException("No DataStream defined");
+        }
+    }
+
+    protected DataStreamCallback resolveDataStreamCallback(Exchange exchange) {
+        if 
(exchange.getIn().getHeader(FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER) != 
null) {
+            return (DataStreamCallback) 
exchange.getIn().getHeader(FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER);
+        } else if (getEndpoint().getDataStreamCallback() != null) {
+            return getEndpoint().getDataStreamCallback();
+        } else {
+            throw new IllegalArgumentException("Cannot resolve DataStream 
callback.");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
index 70c6c39..811ab5c 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.UriEndpointComponent;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
 
 /**
  * The flink component can be used to send DataSet or DataStream jobs to 
Apache Flink cluster.
@@ -29,6 +30,8 @@ public class FlinkComponent extends UriEndpointComponent {
 
     private DataSet ds;
     private DataSetCallback dataSetCallback;
+    private DataStream dataStream;
+    private DataStreamCallback dataStreamCallback;
 
     public FlinkComponent() {
         super(FlinkEndpoint.class);
@@ -44,6 +47,10 @@ public class FlinkComponent extends UriEndpointComponent {
         return ds;
     }
 
+    public DataStream getDataStream() {
+        return dataStream;
+    }
+
     /**
      * DataSet to compute against.
      */
@@ -51,14 +58,32 @@ public class FlinkComponent extends UriEndpointComponent {
         this.ds = ds;
     }
 
+    /**
+     * DataStream to compute against.
+     */
+    public void setDataStream(DataStream dataStream) {
+        this.dataStream = dataStream;
+    }
+
     public DataSetCallback getDataSetCallback() {
         return dataSetCallback;
     }
 
+    public DataStreamCallback getDataStreamCallback() {
+        return dataStreamCallback;
+    }
+
     /**
      * Function performing action against a DataSet.
      */
     public void setDataSetCallback(DataSetCallback dataSetCallback) {
         this.dataSetCallback = dataSetCallback;
     }
+
+    /**
+     * Function performing action against a DataStream.
+     */
+    public void setDataStreamCallback(DataStreamCallback dataStreamCallback) {
+        this.dataStreamCallback = dataStreamCallback;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java
index 6f43a16..1c72e99 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java
@@ -22,7 +22,10 @@ public final class FlinkConstants {
 
     public static final String FLINK_DATASET_CALLBACK_HEADER = 
"CamelFlinkDataSetCallback";
 
+    public static final String FLINK_DATASTREAM_HEADER = 
"CamelFlinkDataStream";
+
+    public static final String FLINK_DATASTREAM_CALLBACK_HEADER = 
"CamelFlinkDataStreamCallback";
+
     private FlinkConstants() {
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
index 396d7df..d5bd872 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java
@@ -25,6 +25,7 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
 
 /**
  * The flink component can be used to send DataSet jobs to Apache Flink 
cluster.
@@ -39,6 +40,13 @@ public class FlinkEndpoint extends DefaultEndpoint {
     private DataSet dataSet;
     @UriParam
     private DataSetCallback dataSetCallback;
+
+    @UriParam
+    private DataStream dataStream;
+
+    @UriParam
+    private DataStreamCallback dataStreamCallback;
+
     @UriParam(defaultValue = "true")
     private boolean collect = true;
 
@@ -64,8 +72,10 @@ public class FlinkEndpoint extends DefaultEndpoint {
     public Producer createProducer() throws Exception {
         if (endpointType == EndpointType.dataset) {
             return new DataSetFlinkProducer(this);
+        } else if (endpointType == EndpointType.datastream) {
+            return new DataStreamFlinkProducer(this);
         } else {
-            throw new UnsupportedOperationException("datastream not yet 
supported");
+            return null;
         }
     }
 
@@ -95,6 +105,10 @@ public class FlinkEndpoint extends DefaultEndpoint {
         return dataSet;
     }
 
+    public DataStream getDataStream() {
+        return dataStream;
+    }
+
     /**
      * DataSet to compute against.
      */
@@ -102,10 +116,21 @@ public class FlinkEndpoint extends DefaultEndpoint {
         this.dataSet = ds;
     }
 
+    /**
+     * DataStream to compute against.
+     */
+    public void setDataStream(DataStream ds) {
+        this.dataStream = ds;
+    }
+
     public DataSetCallback getDataSetCallback() {
         return dataSetCallback;
     }
 
+    public DataStreamCallback getDataStreamCallback() {
+        return dataStreamCallback;
+    }
+
     /**
      * Function performing action against a DataSet.
      */
@@ -113,6 +138,13 @@ public class FlinkEndpoint extends DefaultEndpoint {
         this.dataSetCallback = dataSetCallback;
     }
 
+    /**
+     * Function performing action against a DataStream.
+     */
+    public void setDataStreamCallback(DataStreamCallback dataStreamCallback) {
+        this.dataStreamCallback = dataStreamCallback;
+    }
+
     public boolean isCollect() {
         return collect;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java
----------------------------------------------------------------------
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java
index 927a57d..e85c9af 100644
--- 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java
@@ -17,13 +17,19 @@
 package org.apache.camel.component.flink;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public final class Flinks {
 
     private Flinks() {
+
     }
 
     public static ExecutionEnvironment createExecutionEnvironment() {
         return ExecutionEnvironment.getExecutionEnvironment();
     }
+
+    public static StreamExecutionEnvironment 
createStreamExecutionEnvironment() {
+        return StreamExecutionEnvironment.getExecutionEnvironment();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataStreamCallback.java
----------------------------------------------------------------------
diff --git 
a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataStreamCallback.java
 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataStreamCallback.java
new file mode 100644
index 0000000..493c5d8
--- /dev/null
+++ 
b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataStreamCallback.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.flink;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+public abstract class VoidDataStreamCallback implements 
DataStreamCallback<Void> {
+
+    public abstract void doOnDataStream(DataStream ds, Object... payloads) 
throws Exception;
+
+    @Override
+    public Void onDataStream(DataStream ds, Object... payloads) throws 
Exception {
+        doOnDataStream(ds, payloads);
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java
 
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java
index 4f3e863..c96796f 100644
--- 
a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java
+++ 
b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java
@@ -26,13 +26,17 @@ import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.Test;
 
 public class FlinkProducerTest extends CamelTestSupport {
 
     static ExecutionEnvironment executionEnvironment = 
Flinks.createExecutionEnvironment();
+    static StreamExecutionEnvironment streamExecutionEnvironment = 
Flinks.createStreamExecutionEnvironment();
 
-    String flinkUri = "flink:dataSet?dataSet=#myDataSet";
+    String flinkDataSetUri = "flink:dataSet?dataSet=#myDataSet";
+    String flinkDataStreamUri = "flink:dataStream?dataStream=#myDataStream";
 
     int numberOfLinesInTestFile = 19;
 
@@ -41,6 +45,7 @@ public class FlinkProducerTest extends CamelTestSupport {
         JndiRegistry registry = super.createRegistry();
 
         registry.bind("myDataSet", 
executionEnvironment.readTextFile("src/test/resources/testds.txt"));
+        registry.bind("myDataStream", 
streamExecutionEnvironment.readTextFile("src/test/resources/testds.txt"));
 
         registry.bind("countLinesContaining", new DataSetCallback() {
             @Override
@@ -57,7 +62,7 @@ public class FlinkProducerTest extends CamelTestSupport {
 
     @Test
     public void shouldExecuteDataSetCallback() {
-        Long linesCount = template.requestBodyAndHeader(flinkUri, null, 
FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() {
+        Long linesCount = template.requestBodyAndHeader(flinkDataSetUri, null, 
FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() {
             @Override
             public Object onDataSet(DataSet ds, Object... payloads) {
                 try {
@@ -73,7 +78,7 @@ public class FlinkProducerTest extends CamelTestSupport {
 
     @Test
     public void shouldExecuteDataSetCallbackWithSinglePayload() {
-        Long linesCount = template.requestBodyAndHeader(flinkUri, 10, 
FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() {
+        Long linesCount = template.requestBodyAndHeader(flinkDataSetUri, 10, 
FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() {
             @Override
             public Object onDataSet(DataSet ds, Object... payloads) {
                 try {
@@ -89,7 +94,7 @@ public class FlinkProducerTest extends CamelTestSupport {
 
     @Test
     public void shouldExecuteDataSetCallbackWithPayloads() {
-        Long linesCount = template.requestBodyAndHeader(flinkUri, 
Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, 
new DataSetCallback() {
+        Long linesCount = template.requestBodyAndHeader(flinkDataSetUri, 
Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, 
new DataSetCallback() {
             @Override
             public Object onDataSet(DataSet ds, Object... payloads) {
                 try {
@@ -105,7 +110,7 @@ public class FlinkProducerTest extends CamelTestSupport {
 
     @Test
     public void shouldUseTransformationFromRegistry() {
-        Long linesCount = template.requestBody(flinkUri + 
"&dataSetCallback=#countLinesContaining", null, Long.class);
+        Long linesCount = template.requestBody(flinkDataSetUri + 
"&dataSetCallback=#countLinesContaining", null, Long.class);
         Truth.assertThat(linesCount).isGreaterThan(0L);
     }
 
@@ -114,7 +119,7 @@ public class FlinkProducerTest extends CamelTestSupport {
         final File output = File.createTempFile("camel", "flink");
         output.delete();
 
-        template.sendBodyAndHeader(flinkUri, null, 
FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new VoidDataSetCallback() {
+        template.sendBodyAndHeader(flinkDataSetUri, null, 
FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new VoidDataSetCallback() {
             @Override
             public void doOnDataSet(DataSet ds, Object... payloads) {
                 ds.writeAsText(output.getAbsolutePath());
@@ -137,7 +142,7 @@ public class FlinkProducerTest extends CamelTestSupport {
             }
         });
 
-        long pomLinesCount = template.requestBodyAndHeader(flinkUri, null, 
FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback, Long.class);
+        long pomLinesCount = template.requestBodyAndHeader(flinkDataSetUri, 
null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback, 
Long.class);
 
         Truth.assertThat(pomLinesCount).isEqualTo(19);
     }
@@ -154,7 +159,7 @@ public class FlinkProducerTest extends CamelTestSupport {
             }
         });
 
-        template.sendBodyAndHeader(flinkUri, null, 
FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback);
+        template.sendBodyAndHeader(flinkDataSetUri, null, 
FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback);
 
         Truth.assertThat(output.length()).isAtLeast(0L);
     }
@@ -172,7 +177,22 @@ public class FlinkProducerTest extends CamelTestSupport {
             }
         });
 
-        long pomLinesCount = template.requestBodyAndHeader(flinkUri, 
Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, 
dataSetCallback, Long.class);
+        long pomLinesCount = template.requestBodyAndHeader(flinkDataSetUri, 
Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, 
dataSetCallback, Long.class);
         Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 
* 10);
     }
+
+    @Test
+    public void shouldExecuteVoidDataStreamCallback() throws IOException {
+        final File output = File.createTempFile("camel", "flink");
+        output.delete();
+
+        template.sendBodyAndHeader(flinkDataStreamUri, null, 
FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, new VoidDataStreamCallback() {
+            @Override
+            public void doOnDataStream(DataStream ds, Object... payloads) 
throws Exception {
+                ds.writeAsText(output.getAbsolutePath());
+            }
+        });
+
+        Truth.assertThat(output.length()).isAtLeast(0L);
+    }
 }
\ No newline at end of file

Reply via email to