This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new 8ee0465  [FLINK-11227] [table] Fix bound checking errors in 
DescriptorProperties
8ee0465 is described below

commit 8ee0465761d58eb7fd96846c09614410d644ed65
Author: Xingcan Cui <[email protected]>
AuthorDate: Tue Dec 25 12:03:39 2018 +0800

    [FLINK-11227] [table] Fix bound checking errors in DescriptorProperties
    
    This closes #7373.
---
 .../table/descriptors/DescriptorProperties.java    |  4 +-
 .../descriptors/DescriptorPropertiesTest.scala     | 45 ++++++++++++++++++++--
 2 files changed, 44 insertions(+), 5 deletions(-)

diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index c476981..7628912 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -938,7 +938,7 @@ public class DescriptorProperties {
                }
 
                // validate
-               for (int i = 0; i < maxIndex; i++) {
+               for (int i = 0; i <= maxIndex; i++) {
                        for (Map.Entry<String, Consumer<String>> subKey : 
subKeyValidation.entrySet()) {
                                final String fullKey = key + '.' + i + '.' + 
subKey.getKey();
                                if (properties.containsKey(fullKey)) {
@@ -1134,7 +1134,7 @@ public class DescriptorProperties {
                }
 
                // validate array elements
-               for (int i = 0; i < maxIndex; i++) {
+               for (int i = 0; i <= maxIndex; i++) {
                        final String fullKey = key + '.' + i;
                        if (properties.containsKey(fullKey)) {
                                // run validation logic
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala
index b2a8ec9..fe7c75d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.descriptors
 
 import java.util
 import java.util.Collections
+import java.util.function.Consumer
 
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.util.JavaScalaConversionUtil.toJava
@@ -32,6 +33,9 @@ import org.junit.Test
 class DescriptorPropertiesTest {
 
   private val ARRAY_KEY = "my-array"
+  private val FIXED_INDEXED_PROPERTY_KEY = "my-fixed-indexed-property"
+  private val PROPERTY_1_KEY = "property-1"
+  private val PROPERTY_2_KEY = "property-2"
 
   @Test
   def testEquals(): Unit = {
@@ -97,8 +101,8 @@ class DescriptorPropertiesTest {
   def testArrayInvalidValues(): Unit = {
     val properties = new DescriptorProperties()
     properties.putString(s"$ARRAY_KEY.0", "12")
-    properties.putString(s"$ARRAY_KEY.1", "INVALID")
-    properties.putString(s"$ARRAY_KEY.2", "66")
+    properties.putString(s"$ARRAY_KEY.1", "66")
+    properties.putString(s"$ARRAY_KEY.2", "INVALID")
 
     testArrayValidation(properties, 1, Integer.MAX_VALUE)
   }
@@ -118,6 +122,19 @@ class DescriptorPropertiesTest {
     testArrayValidation(properties, 1, Integer.MAX_VALUE)
   }
 
+  @Test(expected = classOf[ValidationException])
+  def testInvalidFixedIndexedProperties(): Unit = {
+    val property = new DescriptorProperties()
+    val list = new util.ArrayList[util.List[String]]()
+    list.add(util.Arrays.asList("1", "string"))
+    list.add(util.Arrays.asList("INVALID", "string"))
+    property.putIndexedFixedProperties(
+      FIXED_INDEXED_PROPERTY_KEY,
+      util.Arrays.asList(PROPERTY_1_KEY, PROPERTY_2_KEY),
+      list)
+    testFixedIndexedPropertiesValidation(property)
+  }
+
   @Test
   def testRemoveKeys(): Unit = {
     val properties = new DescriptorProperties()
@@ -155,7 +172,7 @@ class DescriptorPropertiesTest {
       minLength: Int,
       maxLength: Int)
     : Unit = {
-    val validator: (String) => Unit = (key: String) => {
+    val validator: String => Unit = (key: String) => {
       properties.validateInt(key, false)
     }
 
@@ -165,4 +182,26 @@ class DescriptorPropertiesTest {
       minLength,
       maxLength)
   }
+
+  private def testFixedIndexedPropertiesValidation(properties: 
DescriptorProperties): Unit = {
+
+    val validatorMap = new util.HashMap[String, Consumer[String]]()
+
+    // PROPERTY_1 should be Int
+    val validator1: String => Unit = (key: String) => {
+      properties.validateInt(key, false)
+    }
+    validatorMap.put(PROPERTY_1_KEY, toJava(validator1))
+    // PROPERTY_2 should be String
+    val validator2: String => Unit = (key: String) => {
+      properties.validateString(key, false)
+    }
+    validatorMap.put(PROPERTY_2_KEY, toJava(validator2))
+
+    properties.validateFixedIndexedProperties(
+      FIXED_INDEXED_PROPERTY_KEY,
+      false,
+      validatorMap
+    )
+  }
 }

Reply via email to