[ 
https://issues.apache.org/jira/browse/FLINK-2631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14734456#comment-14734456
 ] 

ASF GitHub Bot commented on FLINK-2631:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1101#discussion_r38903676
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
 ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +/**
    + * Stream operators can implement this interface if they need access to 
the output type information
    + * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. 
This can be useful for
    + * cases where the output type is specified by the returns method and, 
thus, after the stream
    + * operator has been created.
    + */
    +public interface OutputTypeConfigurable {
    +
    +   /**
    +    * Is called by the {@link 
org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, 
StreamOperator, TypeInformation, TypeInformation, String)}
    +    * method when the {@link 
org.apache.flink.streaming.api.graph.StreamGraph} is generated. The
    +    * method is called with the output {@link TypeInformation} which is 
also used for the
    +    * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output 
serializer.
    +    *
    +    * @param outTypeInfo Output type information of the {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask}
    +    * @param executionConfig Execution configuration
    +    */
    +   void setOutputType(TypeInformation<?> outTypeInfo, ExecutionConfig 
executionConfig);
    --- End diff --
    
    Why don't we type the interface, so that we don't need casts in 
`setOutputType` implementations at the operator?


> StreamFold operator does not respect returns type and stores non serializable 
> values
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-2631
>                 URL: https://issues.apache.org/jira/browse/FLINK-2631
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> The {{StreamFold}} operator stores the initial value of the fold operation 
> for the task deployment. This value does not necessarily have to be 
> serializable. Thus, using the fold operation with a non-serializable initial 
> value will fail the job.
> Moreover, the {{StreamFold}} operator needs to know the output type in order 
> to create a {{TypeSerializer}}. For {{StreamGraphs}} where the output type is 
> not know when the operator is created, as it is the case for the Scala 
> DataStream API which directly sets the output type after creating the 
> operator via the {{returns}} method, this approach will fail. The reason is 
> that the {{StreamFold}} operator does receive the type information set by the 
> {{returns}} method. Therefore, the job will fail at runtime because the 
> operator tries to create a serializer from a {{MissingTypeInformation}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to