jamesnetherton commented on code in PR #6422:
URL: https://github.com/apache/camel-quarkus/pull/6422#discussion_r1743651281


##########
integration-tests-jvm/flink/src/test/java/org/apache/camel/quarkus/component/flink/it/FlinkTest.java:
##########
@@ -16,19 +16,52 @@
  */
 package org.apache.camel.quarkus.component.flink.it;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
+import io.restassured.http.ContentType;
 import org.junit.jupiter.api.Test;
 
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+
 @QuarkusTest
 class FlinkTest {
 
     @Test
-    public void loadComponentFlink() {
-        /* A simple autogenerated test */
-        RestAssured.get("/flink/load/component/flink")
+    public void dataSetCallback() throws IOException {
+        Path path = Files.createTempFile("fileDataSet", ".txt");
+        String text = "foo\n"
+                + "bar\n"
+                + "baz\n"
+                + "qux\n"
+                + "quux";
+        Files.write(path, text.getBytes(StandardCharsets.UTF_8));

Review Comment:
   ```suggestion
           Files.writeString(path, text);
   ```



##########
integration-tests-jvm/flink/src/main/java/org/apache/camel/quarkus/component/flink/it/FlinkResource.java:
##########
@@ -33,18 +48,69 @@ public class FlinkResource {
     private static final Logger LOG = Logger.getLogger(FlinkResource.class);
 
     private static final String COMPONENT_FLINK = "flink";
+
     @Inject
     CamelContext context;
 
-    @Path("/load/component/flink")
-    @GET
+    @Inject
+    ProducerTemplate template;
+
+    String flinkDataSetUri = "flink:dataSet?dataSet=#myDataSet";
+    String flinkDataStreamUri = "flink:datastream?datastream=#myDataStream";
+
+    @Path("/dataset/{fileName}")
+    @POST
+    @Consumes(MediaType.TEXT_PLAIN)
     @Produces(MediaType.TEXT_PLAIN)
-    public Response loadComponentFlink() throws Exception {
-        /* This is an autogenerated test */
-        if (context.getComponent(COMPONENT_FLINK) != null) {
-            return Response.ok().build();
+    public Response dataSetFromTextFile(@PathParam("fileName") String 
fileName) {
+
+        if (Files.exists(Paths.get(fileName))) {
+            ExecutionEnvironment env = Flinks.createExecutionEnvironment();
+            DataSet<String> myDataSet = env.readTextFile(fileName);
+            context.getRegistry().bind("myDataSet", myDataSet);
+            context.getRegistry().bind("countTotal", addDataSetCallback());
+            Long totalCount = template.requestBody(
+                    flinkDataSetUri + "&dataSetCallback=#countTotal", null, 
Long.class);
+            return Response.ok(totalCount).build();
         }
-        LOG.warnf("Could not load [%s] from the Camel context", 
COMPONENT_FLINK);
-        return Response.status(500, COMPONENT_FLINK + " could not be loaded 
from the Camel context").build();
+
+        return Response.status(Response.Status.NOT_FOUND).build();
     }
+
+    @Path("/datastream")
+    @POST
+    @Consumes(MediaType.TEXT_PLAIN)
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response loadStream(String data) throws IOException {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<String> datastream = env.fromElements(data);
+        context.getRegistry().bind("myDataStream", datastream);
+        java.nio.file.Path output;
+        output = Files.createTempFile("camel", "txt");

Review Comment:
   Instead of creating the file here, it would be better to do like the 
`dataSetCallback` test. Create the file (and later delete it) in the test 
method & pass the path to the JAX-RS endpoint.



##########
extensions-jvm/flink/runtime/pom.xml:
##########
@@ -38,18 +38,53 @@
         <dependency>
             <groupId>org.apache.camel.quarkus</groupId>
             <artifactId>camel-quarkus-core</artifactId>
+        </dependency>
+         <dependency>
+         <groupId>org.apache.camel.quarkus</groupId>
+        <artifactId>camel-quarkus-flink-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-jackson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-netty</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-flink</artifactId>
+            <exclusions>
+            <!-- replaced by camel-quarkus-flink-client -->
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-clients</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-streaming-java</artifactId>
+                </exclusion>
+            </exclusions> 
         </dependency>
     </dependencies>
 
     <build>
         <plugins>
-            <plugin>
+                <plugin>
                 <groupId>io.quarkus</groupId>
                 <artifactId>quarkus-extension-maven-plugin</artifactId>
+                <configuration>
+                    <!--Flink uses Inverted Class Loading which causing issue 
for the Quarkus classloading mechanism -->
+                    <parentFirstArtifacts>
+                        
<parentFirstArtifact>org.apache.flink:flink-core</parentFirstArtifact> 
+                        
<parentFirstArtifact>org.apache.flink:flink-rpc-core</parentFirstArtifact>
+                        
<parentFirstArtifact>org.apache.flink:flink-runtime</parentFirstArtifact>
+                    </parentFirstArtifacts>
+                </configuration>

Review Comment:
   Please fix up the indentation.



##########
integration-tests-jvm/flink/src/test/java/org/apache/camel/quarkus/component/flink/it/FlinkTest.java:
##########
@@ -16,19 +16,52 @@
  */
 package org.apache.camel.quarkus.component.flink.it;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
+import io.restassured.http.ContentType;
 import org.junit.jupiter.api.Test;
 
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+
 @QuarkusTest
 class FlinkTest {
 
     @Test
-    public void loadComponentFlink() {
-        /* A simple autogenerated test */
-        RestAssured.get("/flink/load/component/flink")
+    public void dataSetCallback() throws IOException {
+        Path path = Files.createTempFile("fileDataSet", ".txt");

Review Comment:
   Maybe have a `finally` block where you delete the file if it exists.



##########
integration-tests-jvm/flink/src/test/java/org/apache/camel/quarkus/component/flink/it/FlinkTest.java:
##########
@@ -16,19 +16,52 @@
  */
 package org.apache.camel.quarkus.component.flink.it;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
+import io.restassured.http.ContentType;
 import org.junit.jupiter.api.Test;
 
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+
 @QuarkusTest
 class FlinkTest {
 
     @Test
-    public void loadComponentFlink() {
-        /* A simple autogenerated test */
-        RestAssured.get("/flink/load/component/flink")
+    public void dataSetCallback() throws IOException {
+        Path path = Files.createTempFile("fileDataSet", ".txt");
+        String text = "foo\n"
+                + "bar\n"
+                + "baz\n"
+                + "qux\n"
+                + "quux";
+        Files.write(path, text.getBytes(StandardCharsets.UTF_8));
+        RestAssured.given()
+                .contentType(ContentType.TEXT)
+                .post("/flink/dataset/{fileName}", 
path.toAbsolutePath().toString())
+                .then()
+                .statusCode(200)
+                .and()
+                .body(greaterThanOrEqualTo("5"));
+
+    }
+
+    @Test
+    public void dataStreamCallback() throws IOException {
+        String text = "Hello!!Camel flink!";
+        RestAssured.given()
+                .contentType(ContentType.TEXT)
+                .body(text)
+                .post("/flink/datastream")
                 .then()
-                .statusCode(200);
+                .statusCode(200)
+                .and()
+                .body(greaterThanOrEqualTo("1"));

Review Comment:
   I assume we assert `greaterThanOrEqualTo("1")` because Flink is still busy 
doing work asynchronously? 
   
   Do we know what the length of the file content should be?
   
   If so, it might be better to use awaitility here and just wait until we 
assert the file size is what we expect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to