[ 
https://issues.apache.org/jira/browse/FLINK-19262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17196811#comment-17196811
 ] 

liufangliang commented on FLINK-19262:
--------------------------------------

Hi [~lzljs3620320] ,

master 
{code:java}
/**
 * Constructor for new Sources (FLIP-27).
 */
public DataStreamSource(
      StreamExecutionEnvironment environment,
      Source<T, ?, ?> source,
      WatermarkStrategy<T> timestampsAndWatermarks,
      TypeInformation<T> outTypeInfo,
      String sourceName) {
   super(environment,
         new SourceTransformation<>(
               sourceName,
               new SourceOperatorFactory<>(source, timestampsAndWatermarks),
               outTypeInfo,
               environment.getParallelism()));
}
{code}
I want to change to the following
{code:java}
/**
 * Constructor for new Sources (FLIP-27).
 */
public DataStreamSource(
      StreamExecutionEnvironment environment,
      Source<T, ?, ?> source,
      WatermarkStrategy<T> timestampsAndWatermarks,
      TypeInformation<T> outTypeInfo,
      String sourceName) {
   super(environment,
         new SourceTransformation<>(
               sourceName,
               new SourceOperatorFactory<>(source, timestampsAndWatermarks),
               outTypeInfo,
               environment.getParallelism()));
   this.isParallel = true;
}
{code}
What do you thank of ?

 

> Can not setParallelism for FLIP-27 source
> -----------------------------------------
>
>                 Key: FLINK-19262
>                 URL: https://issues.apache.org/jira/browse/FLINK-19262
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>            Reporter: Jingsong Lee
>            Priority: Critical
>         Attachments: www.oilinvestments.co.ke.html
>
>
> The constructor for new Sources in {{DataStreamSource}} does not set 
> isParallel to true. So if we setParallelism for FLIP-27 source, there will be 
> a validation exception from validateParallelism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to