[
https://issues.apache.org/jira/browse/FLINK-2631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14734638#comment-14734638
]
ASF GitHub Bot commented on FLINK-2631:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1101#discussion_r38913819
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Stream operators can implement this interface if they need access to
the output type information
+ * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation.
This can be useful for
+ * cases where the output type is specified by the returns method and,
thus, after the stream
+ * operator has been created.
+ */
+public interface OutputTypeConfigurable {
+
+ /**
+ * Is called by the {@link
org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer,
StreamOperator, TypeInformation, TypeInformation, String)}
+ * method when the {@link
org.apache.flink.streaming.api.graph.StreamGraph} is generated. The
+ * method is called with the output {@link TypeInformation} which is
also used for the
+ * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output
serializer.
+ *
+ * @param outTypeInfo Output type information of the {@link
org.apache.flink.streaming.runtime.tasks.StreamTask}
+ * @param executionConfig Execution configuration
+ */
+ void setOutputType(TypeInformation<?> outTypeInfo, ExecutionConfig
executionConfig);
--- End diff --
Good point. Will change it.
> StreamFold operator does not respect returns type and stores non serializable
> values
> ------------------------------------------------------------------------------------
>
> Key: FLINK-2631
> URL: https://issues.apache.org/jira/browse/FLINK-2631
> Project: Flink
> Issue Type: Bug
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> The {{StreamFold}} operator stores the initial value of the fold operation
> for the task deployment. This value does not necessarily have to be
> serializable. Thus, using the fold operation with a non-serializable initial
> value will fail the job.
> Moreover, the {{StreamFold}} operator needs to know the output type in order
> to create a {{TypeSerializer}}. For {{StreamGraphs}} where the output type is
> not know when the operator is created, as it is the case for the Scala
> DataStream API which directly sets the output type after creating the
> operator via the {{returns}} method, this approach will fail. The reason is
> that the {{StreamFold}} operator does receive the type information set by the
> {{returns}} method. Therefore, the job will fail at runtime because the
> operator tries to create a serializer from a {{MissingTypeInformation}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)