This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 16ba7ff9fd74f5cd0d30c4dce3fb7a8c0a9ed57e Author: Andrea Cosentino <[email protected]> AuthorDate: Wed Jul 24 09:14:32 2019 +0200 Fixed CS for Camel-Flink --- .../camel/component/flink/FlinkProducerTest.java | 25 +++++++++++----------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java index 409d85f..944aa4e 100644 --- a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java +++ b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java @@ -38,20 +38,20 @@ public class FlinkProducerTest extends CamelTestSupport { static ExecutionEnvironment executionEnvironment = Flinks.createExecutionEnvironment(); static StreamExecutionEnvironment streamExecutionEnvironment = Flinks.createStreamExecutionEnvironment(); + String flinkDataSetUri = "flink:dataSet?dataSet=#myDataSet"; + String flinkDataStreamUri = "flink:dataStream?dataStream=#myDataStream"; + + int numberOfLinesInTestFile = 19; + @BindToRegistry("myDataSet") private DataSource<String> ds = executionEnvironment.readTextFile("src/test/resources/testds.txt"); - + @BindToRegistry("myDataStream") private DataStreamSource<String> dss = streamExecutionEnvironment.readTextFile("src/test/resources/testds.txt"); - String flinkDataSetUri = "flink:dataSet?dataSet=#myDataSet"; - String flinkDataStreamUri = "flink:dataStream?dataStream=#myDataStream"; - - int numberOfLinesInTestFile = 19; - @BindToRegistry("countLinesContaining") public DataSetCallback addDataSetCallback() { - return new DataSetCallback() { + return new DataSetCallback() { @Override public Object onDataSet(DataSet ds, Object... payloads) { try { @@ -85,7 +85,7 @@ public class FlinkProducerTest extends CamelTestSupport { @Override public Object onDataSet(DataSet ds, Object... payloads) { try { - return ds.count() * (int) payloads[0]; + return ds.count() * (int)payloads[0]; } catch (Exception e) { return null; } @@ -97,11 +97,11 @@ public class FlinkProducerTest extends CamelTestSupport { @Test public void shouldExecuteDataSetCallbackWithPayloads() { - Long linesCount = template.requestBodyAndHeader(flinkDataSetUri, Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() { + Long linesCount = template.requestBodyAndHeader(flinkDataSetUri, Arrays.<Integer> asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() { @Override public Object onDataSet(DataSet ds, Object... payloads) { try { - return ds.count() * (int) payloads[0] * (int) payloads[1]; + return ds.count() * (int)payloads[0] * (int)payloads[1]; } catch (Exception e) { return null; } @@ -180,7 +180,8 @@ public class FlinkProducerTest extends CamelTestSupport { } }); - long pomLinesCount = template.requestBodyAndHeader(flinkDataSetUri, Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback, Long.class); + long pomLinesCount = template.requestBodyAndHeader(flinkDataSetUri, Arrays.<Integer> asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback, + Long.class); Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 * 10); } @@ -198,4 +199,4 @@ public class FlinkProducerTest extends CamelTestSupport { Truth.assertThat(output.length()).isAtLeast(0L); } -} \ No newline at end of file +}
