This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 00ca13b  [FLINK-17882][table-common] Check for self references in 
structured types
00ca13b is described below

commit 00ca13b84f3eca1ced08c146c2c02a8e672ae7d4
Author: Timo Walther <[email protected]>
AuthorDate: Fri May 22 12:28:22 2020 +0200

    [FLINK-17882][table-common] Check for self references in structured types
    
    This closes #12294.
---
 .../table/types/extraction/DataTypeExtractor.java  |   2 +
 .../table/types/extraction/ExtractionUtils.java    |  15 +++
 .../types/extraction/DataTypeExtractorTest.java    | 102 +++++++++++++++++----
 3 files changed, 102 insertions(+), 17 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
index b025737..ff353fd 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
@@ -59,6 +59,7 @@ import static 
org.apache.flink.table.types.extraction.ExtractionUtils.resolveVar
 import static org.apache.flink.table.types.extraction.ExtractionUtils.toClass;
 import static 
org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass;
 import static 
org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredFieldReadability;
+import static 
org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredSelfReference;
 
 /**
  * Reflection-based utility that analyzes a given {@link 
java.lang.reflect.Type}, method, or class to
@@ -453,6 +454,7 @@ public final class DataTypeExtractor {
                }
 
                validateStructuredClass(clazz);
+               validateStructuredSelfReference(type, typeHierarchy);
 
                final List<Field> fields = collectStructuredFields(clazz);
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
index 56e5266..e814fb6 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
@@ -451,6 +451,21 @@ public final class ExtractionUtils {
        }
 
        /**
+        * Validates if a given type is not already contained in the type 
hierarchy of a structured type.
+        *
+        * <p>Otherwise this would lead to infinite data type extraction cycles.
+        */
+       static void validateStructuredSelfReference(Type t, List<Type> 
typeHierarchy) {
+               final Class<?> clazz = toClass(t);
+               if (clazz != null && !clazz.isInterface() && clazz != 
Object.class && typeHierarchy.contains(t)) {
+                       throw extractionError(
+                               "Cyclic reference detected for class '%s'. 
Attributes of structured types must not " +
+                                       "(transitively) reference the 
structured type itself.",
+                               clazz.getName());
+               }
+       }
+
+       /**
         * Returns the fields of a class for a {@link StructuredType}.
         */
        static List<Field> collectStructuredFields(Class<?> clazz) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
index eb82f5c..92d4949 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
 import org.apache.flink.table.types.logical.TypeInformationRawType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.utils.DataTypeFactoryMock;
@@ -71,6 +72,7 @@ import static org.junit.Assert.assertThat;
  * Tests for {@link DataTypeExtractor}.
  */
 @RunWith(Parameterized.class)
+@SuppressWarnings("unused")
 public class DataTypeExtractorTest {
 
        @SuppressWarnings({"unchecked", "rawtypes"})
@@ -398,12 +400,28 @@ public class DataTypeExtractorTest {
                                .forMethodOutput(IntegerVarArg.class)
                                .expectDataType(DataTypes.INT()),
 
-                       // structured type with invalid constructor
                        TestSpec
-                               .forType(SimplePojoWithInvalidConstructor.class)
+                               .forType(
+                                       "Structured type with invalid 
constructor",
+                                       SimplePojoWithInvalidConstructor.class)
                                .expectErrorMessage(
                                        "Class '" + 
SimplePojoWithInvalidConstructor.class.getName() + "' has neither a " +
-                                               "constructor that assigns all 
fields nor a default constructor.")
+                                               "constructor that assigns all 
fields nor a default constructor."),
+
+                       TestSpec
+                               .forType(
+                                       "Structured type with self reference",
+                                       PojoWithInvalidSelfReference.class)
+                               .expectErrorMessage(
+                                       "Cyclic reference detected for class '" 
+ PojoWithInvalidSelfReference.class.getName() + "'. Attributes " +
+                                               "of structured types must not 
(transitively) reference the structured type itself."),
+
+                       TestSpec
+                               .forType(
+                                       "Structured type with self reference 
that is avoided using RAW",
+                                       PojoWithRawSelfReference.class)
+                               .lookupExpects(PojoWithRawSelfReference.class)
+                               
.expectDataType(getPojoWithRawSelfReferenceDataType())
                );
        }
 
@@ -550,10 +568,10 @@ public class DataTypeExtractorTest {
                final StructuredType.Builder builder = 
StructuredType.newBuilder(simplePojoClass);
                builder.attributes(
                        Arrays.asList(
-                               new 
StructuredType.StructuredAttribute("intField", new IntType(true)),
-                               new 
StructuredType.StructuredAttribute("primitiveBooleanField", new 
BooleanType(false)),
-                               new 
StructuredType.StructuredAttribute("primitiveIntField", new IntType(false)),
-                               new 
StructuredType.StructuredAttribute("stringField", new 
VarCharType(VarCharType.MAX_LENGTH))));
+                               new StructuredAttribute("intField", new 
IntType(true)),
+                               new 
StructuredAttribute("primitiveBooleanField", new BooleanType(false)),
+                               new StructuredAttribute("primitiveIntField", 
new IntType(false)),
+                               new StructuredAttribute("stringField", new 
VarCharType(VarCharType.MAX_LENGTH))));
                builder.setFinal(true);
                builder.setInstantiable(true);
                final StructuredType structuredType = builder.build();
@@ -575,13 +593,13 @@ public class DataTypeExtractorTest {
                final StructuredType.Builder builder = 
StructuredType.newBuilder(complexPojoClass);
                builder.attributes(
                        Arrays.asList(
-                               new StructuredType.StructuredAttribute(
+                               new StructuredAttribute(
                                        "mapField",
                                        new MapType(new 
VarCharType(VarCharType.MAX_LENGTH), new IntType())),
-                               new StructuredType.StructuredAttribute(
+                               new StructuredAttribute(
                                        "simplePojoField",
                                        
getSimplePojoDataType(simplePojoClass).getLogicalType()),
-                               new StructuredType.StructuredAttribute(
+                               new StructuredAttribute(
                                        "someObject",
                                        new TypeInformationRawType<>(new 
GenericTypeInfo<>(Object.class)))));
                builder.setFinal(true);
@@ -604,13 +622,13 @@ public class DataTypeExtractorTest {
                final StructuredType.Builder builder = 
StructuredType.newBuilder(pojoClass);
                builder.attributes(
                        Arrays.asList(
-                               new StructuredType.StructuredAttribute(
+                               new StructuredAttribute(
                                        "z",
                                        new BigIntType()),
-                               new StructuredType.StructuredAttribute(
+                               new StructuredAttribute(
                                        "y",
                                        new BooleanType()),
-                               new StructuredType.StructuredAttribute(
+                               new StructuredAttribute(
                                        "x",
                                        new IntType())));
                builder.setFinal(true);
@@ -630,10 +648,10 @@ public class DataTypeExtractorTest {
                final StructuredType.Builder builder = 
StructuredType.newBuilder(Tuple2.class);
                builder.attributes(
                        Arrays.asList(
-                               new StructuredType.StructuredAttribute(
+                               new StructuredAttribute(
                                        "f0",
                                        new IntType()),
-                               new StructuredType.StructuredAttribute(
+                               new StructuredAttribute(
                                        "f1",
                                        
getInnerTupleDataType().getLogicalType())));
                builder.setFinal(true);
@@ -652,10 +670,10 @@ public class DataTypeExtractorTest {
                final StructuredType.Builder builder = 
StructuredType.newBuilder(Tuple2.class);
                builder.attributes(
                        Arrays.asList(
-                               new StructuredType.StructuredAttribute(
+                               new StructuredAttribute(
                                        "f0",
                                        new 
VarCharType(VarCharType.MAX_LENGTH)),
-                               new StructuredType.StructuredAttribute(
+                               new StructuredAttribute(
                                        "f1",
                                        new BooleanType())));
                builder.setFinal(true);
@@ -670,6 +688,28 @@ public class DataTypeExtractorTest {
                return new FieldsDataType(structuredType, Tuple2.class, 
fieldDataTypes);
        }
 
+       private static DataType getPojoWithRawSelfReferenceDataType() {
+               final StructuredType.Builder builder = 
StructuredType.newBuilder(PojoWithRawSelfReference.class);
+               builder.attributes(
+                       Arrays.asList(
+                               new StructuredAttribute(
+                                       "integer",
+                                       new IntType()),
+                               new StructuredAttribute(
+                                       "reference",
+                                       new TypeInformationRawType<>(new 
GenericTypeInfo<>(PojoWithRawSelfReference.class)))));
+               builder.setFinal(true);
+               builder.setInstantiable(true);
+               final StructuredType structuredType = builder.build();
+
+               final List<DataType> fieldDataTypes = Arrays.asList(
+                       DataTypes.INT(),
+                       DataTypes.RAW(new 
GenericTypeInfo<>(PojoWithRawSelfReference.class))
+               );
+
+               return new FieldsDataType(structuredType, 
PojoWithRawSelfReference.class, fieldDataTypes);
+       }
+
        // 
--------------------------------------------------------------------------------------------
        // Test classes for extraction
        // 
--------------------------------------------------------------------------------------------
@@ -970,4 +1010,32 @@ public class DataTypeExtractorTest {
                        this.intField = intField;
                }
        }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Self reference in attribute.
+        */
+       public static class PojoWithInvalidSelfReference {
+               public Integer integer;
+               public PojoWithInvalidSelfReferenceNested nestedPojo;
+       }
+
+       /**
+        * Nested POJO for self reference test.
+        */
+       public static class PojoWithInvalidSelfReferenceNested {
+               public PojoWithInvalidSelfReference reference;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Self reference in attribute that is fixed with RAW type.
+        */
+       public static class PojoWithRawSelfReference {
+               public Integer integer;
+               @DataTypeHint(value = "RAW", bridgedTo = 
PojoWithRawSelfReference.class)
+               public PojoWithRawSelfReference reference;
+       }
 }

Reply via email to