[
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905414#comment-15905414
]
ASF GitHub Bot commented on FLINK-4460:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/3484#discussion_r105441679
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side
outputs
+ * of an operator.
+ *
+ * <p>An {@code OutputTag} must always be an anonymous inner class so that
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * <p>Example:
+ * <pre>{@code
+ * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String,
Long>>("late-data"){});
+ * }</pre>
+ *
+ * @param <T> the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag<T> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String id;
+
+ private transient TypeInformation<T> typeInfo;
+
+ /**
+ * Creates a new named {@code OutputTag} with the given id.
+ *
+ * @param id The id of the created {@code OutputTag}.
+ */
+ public OutputTag(String id) {
+ Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+ this.id = requireNonNull(id);
+
+ try {
+ TypeHint<T> typeHint =
+ new TypeHint<T>(OutputTag.class, this,
0) {};
+ this.typeInfo = typeHint.getTypeInfo();
+ } catch (InvalidTypesException e) {
+ throw new InvalidTypesException("Could not determine
TypeInformation for generic " +
+ "OutputTag type. Did you forget to make
your OutputTag an anonymous inner class?", e);
+ }
+ }
+
+ /**
+ * Creates a new named {@code OutputTag} with the given id and output
{@link TypeInformation}.
+ *
+ * @param id The id of the created {@code OutputTag}.
+ * @param typeInfo The {@code TypeInformation} for the side output.
+ */
+ public OutputTag(String id, TypeInformation<T> typeInfo) {
+ this.id = Preconditions.checkNotNull(id, "OutputTag id cannot
be null.");
+ this.typeInfo =
+ Preconditions.checkNotNull(typeInfo,
"TypeInformation cannot be null.");
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.defaultReadObject();
+ typeInfo = null;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public TypeInformation<T> getTypeInfo() {
+ return typeInfo;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof OutputTag
--- End diff --
I see. The problem is that if this does not work, then we can have
important side effects.
> Side Outputs in Flink
> ---------------------
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
> Issue Type: New Feature
> Components: Core, DataStream API
> Affects Versions: 1.2.0, 1.1.3
> Reporter: Chen Qin
> Assignee: Chen Qin
> Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)