[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905412#comment-15905412 ]
ASF GitHub Bot commented on FLINK-4460: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105441456 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * <p>An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * <p>Example: + * <pre>{@code + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){}); + * }</pre> + * + * @param <T> the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation<T> typeInfo; + + /** + * Creates a new named {@code OutputTag} with the given id. + * + * @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint<T> typeHint = + new TypeHint<T>(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** + * Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}. + * + * @param id The id of the created {@code OutputTag}. + * @param typeInfo The {@code TypeInformation} for the side output. + */ + public OutputTag(String id, TypeInformation<T> typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation<T> getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag --- End diff -- I would have liked to include the `TypeInformation` into the check but we can't do that because it's transient. I'll try and figure something out for checking that side outputs are unique, not as easy as it seems. > Side Outputs in Flink > --------------------- > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API > Affects Versions: 1.2.0, 1.1.3 > Reporter: Chen Qin > Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)