[
https://issues.apache.org/jira/browse/FLINK-26576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lijie Wang updated FLINK-26576:
-------------------------------
Description:
In [StreamExecutionEnvironment#createFileInput
|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B],
the {{env.getParallelism()}} was passed to
{{ContinuousFileMonitoringFunction}} as the parallelism of downstream readers.
This value is incorrect when the parallelism of the downstream readers is
manually configured by the user.
For example, in the test below, *1* will be passed as
{{{}readerParallelism{}}}, but the actual parallelism of downstream readers is
{*}5{*}.
{code:java}
@Test
public void testContinuousFileMonitoringFunction() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
final String fileContent = "line1\n" + "line2\n" + "line3\n" +
"line4\n" + "line5\n";
final File file = createTempFile(fileContent);
env.readTextFile(file.getPath()).name("TextSource").setParallelism(5)
.forward()
.addSink(new PrintSinkFunction<>()).setParallelism(5);
env.execute();
}
private File createTempFile(String content) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();
OutputStreamWriter wrt =
new OutputStreamWriter(new FileOutputStream(tempFile),
StandardCharsets.UTF_8);
wrt.write(content);
wrt.close();
return tempFile;
}
{code}
was:
In [StreamExecutionEnvironment#createFileInput
|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B],
the {{env.getParallelism()}} was passed to
{{ContinuousFileMonitoringFunction}} as the parallelism of downstream readers.
This value is incorrect when the parallelism of the downstream readers is
manually configured by the user.
For example, in the test below, *1* will be passed as
{{{}readerParallelism{}}}, but the actual parallelism of readers is {*}5{*}.
{code:java}
@Test
public void testContinuousFileMonitoringFunction() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
final String fileContent = "line1\n" + "line2\n" + "line3\n" +
"line4\n" + "line5\n";
final File file = createTempFile(fileContent);
env.readTextFile(file.getPath()).name("TextSource").setParallelism(5)
.forward()
.addSink(new PrintSinkFunction<>()).setParallelism(5);
env.execute();
}
private File createTempFile(String content) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();
OutputStreamWriter wrt =
new OutputStreamWriter(new FileOutputStream(tempFile),
StandardCharsets.UTF_8);
wrt.write(content);
wrt.close();
return tempFile;
}
{code}
> The value of 'readerParallelism' passed to ContinuousFileMonitoringFunction
> is wrong
> ------------------------------------------------------------------------------------
>
> Key: FLINK-26576
> URL: https://issues.apache.org/jira/browse/FLINK-26576
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Reporter: Lijie Wang
> Priority: Major
>
> In [StreamExecutionEnvironment#createFileInput
> |https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B],
> the {{env.getParallelism()}} was passed to
> {{ContinuousFileMonitoringFunction}} as the parallelism of downstream
> readers. This value is incorrect when the parallelism of the downstream
> readers is manually configured by the user.
> For example, in the test below, *1* will be passed as
> {{{}readerParallelism{}}}, but the actual parallelism of downstream readers
> is {*}5{*}.
> {code:java}
> @Test
> public void testContinuousFileMonitoringFunction() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(1);
> final String fileContent = "line1\n" + "line2\n" + "line3\n" +
> "line4\n" + "line5\n";
> final File file = createTempFile(fileContent);
> env.readTextFile(file.getPath()).name("TextSource").setParallelism(5)
> .forward()
> .addSink(new PrintSinkFunction<>()).setParallelism(5);
> env.execute();
> }
> private File createTempFile(String content) throws IOException {
> File tempFile = File.createTempFile("test_contents", "tmp");
> tempFile.deleteOnExit();
> OutputStreamWriter wrt =
> new OutputStreamWriter(new FileOutputStream(tempFile),
> StandardCharsets.UTF_8);
> wrt.write(content);
> wrt.close();
> return tempFile;
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)