[FLINK-6551] Reject empty OutputTag names

This closes #3953.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9e75ff0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9e75ff0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9e75ff0

Branch: refs/heads/master
Commit: b9e75ff0efeb0fe604f7e55b48291de1e9f6a752
Parents: 0403563
Author: zentol <[email protected]>
Authored: Fri May 19 16:20:05 2017 +0200
Committer: zentol <[email protected]>
Committed: Sat May 20 16:05:03 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/util/OutputTag.java   |  8 +++-
 .../org/apache/flink/util/OutputTagTest.java    | 44 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9e75ff0/flink-core/src/main/java/org/apache/flink/util/OutputTag.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/OutputTag.java 
b/flink-core/src/main/java/org/apache/flink/util/OutputTag.java
index 785855f..800c0b5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/OutputTag.java
+++ b/flink-core/src/main/java/org/apache/flink/util/OutputTag.java
@@ -56,7 +56,9 @@ public class OutputTag<T> implements Serializable {
         * @param id The id of the created {@code OutputTag}.
      */
        public OutputTag(String id) {
-               this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+               Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+               Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must 
not be empty.");
+               this.id = id;
 
                try {
                        TypeHint<T> typeHint = new TypeHint<T>(OutputTag.class, 
this, 0) {};
@@ -74,7 +76,9 @@ public class OutputTag<T> implements Serializable {
         * @param typeInfo The {@code TypeInformation} for the side output.
         */
        public OutputTag(String id, TypeInformation<T> typeInfo) {
-               this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+               Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+               Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must 
not be empty.");
+               this.id = id;
                this.typeInfo = Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9e75ff0/flink-core/src/test/java/org/apache/flink/util/OutputTagTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/OutputTagTest.java 
b/flink-core/src/test/java/org/apache/flink/util/OutputTagTest.java
new file mode 100644
index 0000000..1caa5b2
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/OutputTagTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.junit.Test;
+
+public class OutputTagTest {
+
+       @Test(expected = NullPointerException.class)
+       public void testNullRejected() {
+               new OutputTag<Integer>(null);
+       }
+
+       @Test(expected = NullPointerException.class)
+       public void testNullRejectedWithTypeInfo() {
+               new OutputTag<>(null, BasicTypeInfo.INT_TYPE_INFO);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testEmptyStringRejected() {
+               new OutputTag<Integer>("");
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testEmptyStringRejectedWithTypeInfo() {
+               new OutputTag<>("", BasicTypeInfo.INT_TYPE_INFO);
+       }
+}

Reply via email to