XComp commented on code in PR #23490: URL: https://github.com/apache/flink/pull/23490#discussion_r1368358256
########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { + + /** Record canonical constructor. */ + private final Constructor<T> recordConstructor; + + /** + * Record constructor parameter in case the new constructor has a different parameter order then + * the serialized data. Used for schema evolution. Null when not needed. Review Comment: ```suggestion * the serialized data. Used for schema evolution or `null` if no schema evolution is applied for that record class. ``` nit: Sorry for being stingy here. I burned myself on this topic in the past when working on the FileSystem delete logic. :innocent: What about making the `null` contract more explicit. `is not needed` still sounds a bit ambiguous (even with the schema evolution being mentioned in the sentence before). Feel free to not follow it if you disagree ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { Review Comment: ```suggestion final class JavaRecordBuilderFactory<T> { ``` Essentially it's a factory for `JavaRecordBuilder` instances. WDYT? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { + + /** Record canonical constructor. */ + private final Constructor<T> recordConstructor; + + /** + * Record constructor parameter in case the new constructor has a different parameter order then + * the serialized data. Used for schema evolution. Null when not needed. + */ + @Nullable private final int[] paramIndexMapping; + /** + * Default record args used for newly introduced primitive fields during schema evolution. Null + * when not needed. + */ + @Nullable private final Object[] defaultConstructorArgs; + + private JavaRecordHelper(Constructor<T> recordConstructor) { + this(recordConstructor, null, null); Review Comment: ```suggestion this(recordConstructor, null, new Object[recordConstructor.getParameterCount()]); ``` Creating the defaultArgs even in this case would remove the necessity of doing `null` for that parameter. WDYT? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { + + /** Record canonical constructor. */ Review Comment: ```suggestion ``` nit: I think that this comment isn't adding any value. We could rename the member variable if you think that the word `canonical` is needed. Feel free to keep the comment if you disagree. ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { + + /** Record canonical constructor. */ + private final Constructor<T> recordConstructor; + + /** + * Record constructor parameter in case the new constructor has a different parameter order then Review Comment: ```suggestion * Record constructor parameter in case the new constructor has a different parameter order than ``` nit: one typo PR less in the future :innocent: ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { + + /** Record canonical constructor. */ + private final Constructor<T> recordConstructor; + + /** + * Record constructor parameter in case the new constructor has a different parameter order then + * the serialized data. Used for schema evolution. Null when not needed. + */ + @Nullable private final int[] paramIndexMapping; + /** + * Default record args used for newly introduced primitive fields during schema evolution. Null + * when not needed. + */ + @Nullable private final Object[] defaultConstructorArgs; + + private JavaRecordHelper(Constructor<T> recordConstructor) { + this(recordConstructor, null, null); + } + + private JavaRecordHelper( + Constructor<T> recordConstructor, + int[] argIndexMapping, + Object[] defaultConstructorArgs) { + Preconditions.checkArgument((argIndexMapping == null) == (defaultConstructorArgs == null)); + this.recordConstructor = recordConstructor; + this.paramIndexMapping = argIndexMapping; + this.defaultConstructorArgs = defaultConstructorArgs; + } + + public JavaRecordBuilder newBuilder() { Review Comment: ```suggestion public JavaRecordBuilder create() { ``` ...if we go with `JavaRecordBuilderFactory`.Usually, `newBuilder()` returns a Builder for the class itself (i.e. `JavaRecordHelper`). The Factory pattern make the relationship clearer. ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { + + /** Record canonical constructor. */ + private final Constructor<T> recordConstructor; + + /** + * Record constructor parameter in case the new constructor has a different parameter order then + * the serialized data. Used for schema evolution. Null when not needed. + */ + @Nullable private final int[] paramIndexMapping; + /** + * Default record args used for newly introduced primitive fields during schema evolution. Null + * when not needed. + */ + @Nullable private final Object[] defaultConstructorArgs; + + private JavaRecordHelper(Constructor<T> recordConstructor) { + this(recordConstructor, null, null); + } + + private JavaRecordHelper( + Constructor<T> recordConstructor, + int[] argIndexMapping, + Object[] defaultConstructorArgs) { + Preconditions.checkArgument((argIndexMapping == null) == (defaultConstructorArgs == null)); + this.recordConstructor = recordConstructor; + this.paramIndexMapping = argIndexMapping; + this.defaultConstructorArgs = defaultConstructorArgs; + } + + public JavaRecordBuilder newBuilder() { + return new JavaRecordBuilder(); + } + + /** Builder class for incremental record construction. */ + @Internal + final class JavaRecordBuilder { + private final Object[] args; + + JavaRecordBuilder() { + if (defaultConstructorArgs == null) { + args = new Object[recordConstructor.getParameterCount()]; + } else { + args = Arrays.copyOf(defaultConstructorArgs, defaultConstructorArgs.length); + } + } + + T build() { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + void setParam(int i, Object field) { + if (paramIndexMapping != null) { + args[paramIndexMapping[i]] = field; + } else { + args[i] = field; + } + } + } + + public static <T> JavaRecordHelper<T> create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // before Java 14 + Method getType = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getType"); + Method getName = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getName"); + for (int i = 0; i < recordComponents.length; i++) { + componentNames.add((String) getName.invoke(recordComponents[i])); + componentTypes[i] = (Class<?>) getType.invoke(recordComponents[i]); + } + Constructor<T> recordConstructor = clazz.getDeclaredConstructor(componentTypes); + recordConstructor.setAccessible(true); + + List<String> previousFields = + Arrays.stream(fields) + // There may be (removed) null fields due to schema evolution + .filter(Objects::nonNull) + .map(Field::getName) + .collect(Collectors.toList()); + + // If the field names / order changed we know that we are migrating the records and arg + // index remapping may be necessary + boolean migrating = !previousFields.equals(componentNames); + if (migrating) { Review Comment: ```suggestion if (!previousFields.equals(componentNames)) { ``` we don't need the local variable anymore ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -89,6 +89,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private transient ClassLoader cl; + private final boolean isRecord; Review Comment: ```suggestion private final boolean isRecord; ``` Do we gain anything from having this final field during deserialization instead of doing a `null` check on the transient field `recordHelper`? The constructor is called, anyway, which would initialize `recordHelper` properly based on the `clazz` parameter. ...or am I missing something here? :thinking: ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordHelperTest.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for the @{@link JavaRecordHelper}. */ +class Java17RecordHelperTest { + + Field[] fields; + + record TestRecord(int i1, int i2, String s1, String s2) {} + + @BeforeEach + void setup() { + fields = TestRecord.class.getDeclaredFields(); + } + + @Test + void testNoDefaultOrParamMapping() { + JavaRecordHelper<TestRecord> helper = JavaRecordHelper.create(TestRecord.class, fields); + JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + builder.setParam(1, 100); + builder.setParam(0, 50); Review Comment: ```suggestion builder.setParam(0, 50); builder.setParam(1, 100); ``` nit: maybe, let's not confuse the reader more than necessary by switching orders. The order doesn't matter here, does it? ########## pom.xml: ########## @@ -1102,6 +1103,12 @@ under the License. <profile> <id>java17-target</id> + + <!-- Include Java 17 specific tests (by not excluding them) --> + <properties> Review Comment: I see. Thanks for clarification :+1: ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -118,6 +122,9 @@ public PojoSerializer( createRegisteredSubclassSerializers(registeredSubclasses, executionConfig); this.subclassSerializerCache = new HashMap<>(); + if (this.isRecord = TypeExtractor.isRecord(clazz)) { Review Comment: Do we gain anything from having the final field `isRecord` during deserialization instead of doing a `null` check on the transient field `recordHelper`? AFAIU, the constructor is called, anyway, which would initialize `recordHelper` properly based on the clazz parameter. ...or am I missing something here? 🤔 ########## tools/maven/suppressions-core.xml: ########## @@ -121,4 +121,7 @@ under the License. <suppress files="(.*)test[/\\](.*)testutils[/\\](.*)" checks="RedundantModifier|JavadocParagraph|JavadocType|JavadocStyle|StaticVariableNameCheck|LocalFinalVariableName|EmptyLineSeparator"/> + + <!-- suppress check java 17 tests --> + <suppress files="(.*)test[/\\].*[/\\]Java17.*.java" checks=".*"/> Review Comment: Any opinion on specifying the test classes explicitly rather than using regex? ...since we don't have that many classes, yet :thinking: I'm fine with keeping it like it's now, though. ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordHelperTest.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for the @{@link JavaRecordHelper}. */ +class Java17RecordHelperTest { + + Field[] fields; + + record TestRecord(int i1, int i2, String s1, String s2) {} + + @BeforeEach + void setup() { + fields = TestRecord.class.getDeclaredFields(); + } + + @Test + void testNoDefaultOrParamMapping() { + JavaRecordHelper<TestRecord> helper = JavaRecordHelper.create(TestRecord.class, fields); + JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + builder.setParam(1, 100); + builder.setParam(0, 50); + builder.setParam(3, "test"); + + assertThat(builder.build()).isEqualTo(new TestRecord(50, 100, null, "test")); + } + + @Test + void testNewFieldsAdded() { + // Test restoring from fields [i2, s1] + JavaRecordHelper<TestRecord> helper = + JavaRecordHelper.create(TestRecord.class, Arrays.copyOfRange(fields, 1, 3)); + + JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + builder.setParam(0, 100); + builder.setParam(1, "test"); + + assertThat(builder.build()).isEqualTo(new TestRecord(0, 100, "test", null)); + } + + @Test + void testFieldsAddedRemovedAndRearranged() { + Field[] oldFields = new Field[] {fields[3], null, fields[0]}; + JavaRecordHelper<TestRecord> helper = JavaRecordHelper.create(TestRecord.class, oldFields); + + JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + builder.setParam(0, "test"); + builder.setParam(2, 100); + + assertThat(builder.build()).isEqualTo(new TestRecord(100, 0, null, "test")); + } + + @Test + void testReorderFields() { + // Swap first and last field + Field temp = fields[0]; + fields[0] = fields[3]; + fields[3] = temp; + + JavaRecordHelper<TestRecord> helper = JavaRecordHelper.create(TestRecord.class, fields); + + JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + builder.setParam(0, "4"); + builder.setParam(1, 2); + builder.setParam(2, "3"); + builder.setParam(3, 1); + + assertThat(builder.build()).isEqualTo(new TestRecord(1, 2, "3", "4")); + } + + @Test + void testMissingRequiredField() { + JavaRecordHelper<TestRecord> helper = JavaRecordHelper.create(TestRecord.class, fields); + JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + + builder.setParam(0, 50); + // Do not set required param 1 + + assertThatThrownBy(() -> builder.build()) Review Comment: ```suggestion assertThatThrownBy(builder::build) ``` nit ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { + + /** Record canonical constructor. */ + private final Constructor<T> recordConstructor; + + /** + * Record constructor parameter in case the new constructor has a different parameter order then + * the serialized data. Used for schema evolution. Null when not needed. + */ + @Nullable private final int[] paramIndexMapping; + /** + * Default record args used for newly introduced primitive fields during schema evolution. Null + * when not needed. + */ + @Nullable private final Object[] defaultConstructorArgs; + + private JavaRecordHelper(Constructor<T> recordConstructor) { + this(recordConstructor, null, null); + } + + private JavaRecordHelper( + Constructor<T> recordConstructor, + int[] argIndexMapping, + Object[] defaultConstructorArgs) { + Preconditions.checkArgument((argIndexMapping == null) == (defaultConstructorArgs == null)); + this.recordConstructor = recordConstructor; + this.paramIndexMapping = argIndexMapping; + this.defaultConstructorArgs = defaultConstructorArgs; + } + + public JavaRecordBuilder newBuilder() { + return new JavaRecordBuilder(); + } + + /** Builder class for incremental record construction. */ + @Internal + final class JavaRecordBuilder { + private final Object[] args; + + JavaRecordBuilder() { + if (defaultConstructorArgs == null) { + args = new Object[recordConstructor.getParameterCount()]; + } else { + args = Arrays.copyOf(defaultConstructorArgs, defaultConstructorArgs.length); + } + } + + T build() { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + void setParam(int i, Object field) { + if (paramIndexMapping != null) { + args[paramIndexMapping[i]] = field; + } else { + args[i] = field; + } + } + } + + public static <T> JavaRecordHelper<T> create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // before Java 14 + Method getType = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getType"); + Method getName = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getName"); + for (int i = 0; i < recordComponents.length; i++) { + componentNames.add((String) getName.invoke(recordComponents[i])); + componentTypes[i] = (Class<?>) getType.invoke(recordComponents[i]); + } + Constructor<T> recordConstructor = clazz.getDeclaredConstructor(componentTypes); + recordConstructor.setAccessible(true); + + List<String> previousFields = + Arrays.stream(fields) + // There may be (removed) null fields due to schema evolution Review Comment: ```suggestion // There may be removed fields due to schema evolution still listed ``` again nitty: Maybe that's my English but I could read "removed null field" as the null field is already removed ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordSerializationHelper.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ Review Comment: Renaming the class to `JavaRecordBuilderFactory` would be good enough to address purpose of it without touching the JavaDoc, I guess :thinking: ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -118,6 +122,9 @@ public PojoSerializer( createRegisteredSubclassSerializers(registeredSubclasses, executionConfig); this.subclassSerializerCache = new HashMap<>(); + if (this.isRecord = TypeExtractor.isRecord(clazz)) { Review Comment: Do we gain anything from having the final field `isRecord` during deserialization instead of doing a `null` check on the transient field `recordHelper`? AFAIU, the constructor is called, anyway, which would initialize `recordHelper` properly based on the clazz parameter. ...or am I missing something here? 🤔 ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { + + /** Record canonical constructor. */ + private final Constructor<T> recordConstructor; + + /** + * Record constructor parameter in case the new constructor has a different parameter order then + * the serialized data. Used for schema evolution. Null when not needed. + */ + @Nullable private final int[] paramIndexMapping; + /** + * Default record args used for newly introduced primitive fields during schema evolution. Null + * when not needed. + */ + @Nullable private final Object[] defaultConstructorArgs; + + private JavaRecordHelper(Constructor<T> recordConstructor) { + this(recordConstructor, null, null); + } + + private JavaRecordHelper( + Constructor<T> recordConstructor, + int[] argIndexMapping, + Object[] defaultConstructorArgs) { + Preconditions.checkArgument((argIndexMapping == null) == (defaultConstructorArgs == null)); + this.recordConstructor = recordConstructor; + this.paramIndexMapping = argIndexMapping; + this.defaultConstructorArgs = defaultConstructorArgs; + } + + public JavaRecordBuilder newBuilder() { + return new JavaRecordBuilder(); + } + + /** Builder class for incremental record construction. */ + @Internal + final class JavaRecordBuilder { + private final Object[] args; + + JavaRecordBuilder() { + if (defaultConstructorArgs == null) { + args = new Object[recordConstructor.getParameterCount()]; + } else { + args = Arrays.copyOf(defaultConstructorArgs, defaultConstructorArgs.length); + } + } + + T build() { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + void setParam(int i, Object field) { + if (paramIndexMapping != null) { + args[paramIndexMapping[i]] = field; + } else { + args[i] = field; + } + } + } + + public static <T> JavaRecordHelper<T> create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // before Java 14 + Method getType = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getType"); + Method getName = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getName"); + for (int i = 0; i < recordComponents.length; i++) { + componentNames.add((String) getName.invoke(recordComponents[i])); + componentTypes[i] = (Class<?>) getType.invoke(recordComponents[i]); + } + Constructor<T> recordConstructor = clazz.getDeclaredConstructor(componentTypes); + recordConstructor.setAccessible(true); + + List<String> previousFields = + Arrays.stream(fields) + // There may be (removed) null fields due to schema evolution + .filter(Objects::nonNull) + .map(Field::getName) + .collect(Collectors.toList()); + + // If the field names / order changed we know that we are migrating the records and arg + // index remapping may be necessary + boolean migrating = !previousFields.equals(componentNames); + if (migrating) { + // If the order / index of arguments changed in the new record class we have to map + // it, otherwise we pass the wrong arguments to the constructor + int[] argIndexMapping = new int[fields.length]; + for (int i = 0; i < fields.length; i++) { + Field field = fields[i]; + // There may be (removed) null fields due to schema evolution Review Comment: ```suggestion // There may be removed fields due to schema evolution still listed ``` nit: same reasoning as above ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/Java17RecordHelperTest.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for the @{@link JavaRecordHelper}. */ +class Java17RecordHelperTest { + + Field[] fields; + + record TestRecord(int i1, int i2, String s1, String s2) {} + + @BeforeEach + void setup() { + fields = TestRecord.class.getDeclaredFields(); + } + + @Test + void testNoDefaultOrParamMapping() { + JavaRecordHelper<TestRecord> helper = JavaRecordHelper.create(TestRecord.class, fields); + JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + builder.setParam(1, 100); + builder.setParam(0, 50); + builder.setParam(3, "test"); + + assertThat(builder.build()).isEqualTo(new TestRecord(50, 100, null, "test")); + } + + @Test + void testNewFieldsAdded() { + // Test restoring from fields [i2, s1] + JavaRecordHelper<TestRecord> helper = + JavaRecordHelper.create(TestRecord.class, Arrays.copyOfRange(fields, 1, 3)); + + JavaRecordHelper<TestRecord>.JavaRecordBuilder builder = helper.newBuilder(); + builder.setParam(0, 100); + builder.setParam(1, "test"); Review Comment: It took me a bit to figure out whether we should use the "new" or "old" parameter indexes here. That's an indication that we could make the method or parameter names of the `setParam` more explicit. But I cannot come up with a better naming. `setValueBasedOnPreviousParamIndex(..)` sounds reasonable. But I'm not a fan of the "previous" label in this context. :thinking: maybe, you have a better idea. ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { + + /** Record canonical constructor. */ + private final Constructor<T> recordConstructor; + + /** + * Record constructor parameter in case the new constructor has a different parameter order then + * the serialized data. Used for schema evolution. Null when not needed. + */ + @Nullable private final int[] paramIndexMapping; + /** + * Default record args used for newly introduced primitive fields during schema evolution. Null + * when not needed. + */ + @Nullable private final Object[] defaultConstructorArgs; + + private JavaRecordHelper(Constructor<T> recordConstructor) { + this(recordConstructor, null, null); + } + + private JavaRecordHelper( + Constructor<T> recordConstructor, + int[] argIndexMapping, + Object[] defaultConstructorArgs) { + Preconditions.checkArgument((argIndexMapping == null) == (defaultConstructorArgs == null)); + this.recordConstructor = recordConstructor; + this.paramIndexMapping = argIndexMapping; + this.defaultConstructorArgs = defaultConstructorArgs; + } + + public JavaRecordBuilder newBuilder() { + return new JavaRecordBuilder(); + } + + /** Builder class for incremental record construction. */ + @Internal + final class JavaRecordBuilder { + private final Object[] args; + + JavaRecordBuilder() { + if (defaultConstructorArgs == null) { + args = new Object[recordConstructor.getParameterCount()]; + } else { + args = Arrays.copyOf(defaultConstructorArgs, defaultConstructorArgs.length); + } + } + + T build() { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + void setParam(int i, Object field) { Review Comment: ```suggestion void setParam(int parameterIndex, Object parameterValue) { ``` ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { + + /** Record canonical constructor. */ + private final Constructor<T> recordConstructor; + + /** + * Record constructor parameter in case the new constructor has a different parameter order then + * the serialized data. Used for schema evolution. Null when not needed. + */ + @Nullable private final int[] paramIndexMapping; + /** + * Default record args used for newly introduced primitive fields during schema evolution. Null + * when not needed. + */ + @Nullable private final Object[] defaultConstructorArgs; + + private JavaRecordHelper(Constructor<T> recordConstructor) { + this(recordConstructor, null, null); + } + + private JavaRecordHelper( + Constructor<T> recordConstructor, + int[] argIndexMapping, Review Comment: ```suggestion @Nullable int[] argIndexMapping, ``` ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaRecordHelper.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.base.Defaults; + +import javax.annotation.Nullable; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** Utilities for handling Java records nicely in the {@link PojoSerializer}. */ +@Internal +final class JavaRecordHelper<T> { + + /** Record canonical constructor. */ + private final Constructor<T> recordConstructor; + + /** + * Record constructor parameter in case the new constructor has a different parameter order then + * the serialized data. Used for schema evolution. Null when not needed. + */ + @Nullable private final int[] paramIndexMapping; + /** + * Default record args used for newly introduced primitive fields during schema evolution. Null + * when not needed. + */ + @Nullable private final Object[] defaultConstructorArgs; + + private JavaRecordHelper(Constructor<T> recordConstructor) { + this(recordConstructor, null, null); + } + + private JavaRecordHelper( + Constructor<T> recordConstructor, + int[] argIndexMapping, + Object[] defaultConstructorArgs) { + Preconditions.checkArgument((argIndexMapping == null) == (defaultConstructorArgs == null)); + this.recordConstructor = recordConstructor; + this.paramIndexMapping = argIndexMapping; + this.defaultConstructorArgs = defaultConstructorArgs; + } + + public JavaRecordBuilder newBuilder() { + return new JavaRecordBuilder(); + } + + /** Builder class for incremental record construction. */ + @Internal + final class JavaRecordBuilder { + private final Object[] args; + + JavaRecordBuilder() { + if (defaultConstructorArgs == null) { + args = new Object[recordConstructor.getParameterCount()]; + } else { + args = Arrays.copyOf(defaultConstructorArgs, defaultConstructorArgs.length); + } + } + + T build() { + try { + return recordConstructor.newInstance(args); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate record", e); + } + } + + void setParam(int i, Object field) { + if (paramIndexMapping != null) { + args[paramIndexMapping[i]] = field; + } else { + args[i] = field; + } + } + } + + public static <T> JavaRecordHelper<T> create(Class<T> clazz, Field[] fields) { + try { + Object[] recordComponents = + (Object[]) Class.class.getMethod("getRecordComponents").invoke(clazz); + + Class<?>[] componentTypes = new Class[recordComponents.length]; + List<String> componentNames = new ArrayList<>(recordComponents.length); + + // We need to use reflection to access record components as they are not available in + // before Java 14 + Method getType = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getType"); + Method getName = + Class.forName("java.lang.reflect.RecordComponent").getMethod("getName"); + for (int i = 0; i < recordComponents.length; i++) { + componentNames.add((String) getName.invoke(recordComponents[i])); + componentTypes[i] = (Class<?>) getType.invoke(recordComponents[i]); + } + Constructor<T> recordConstructor = clazz.getDeclaredConstructor(componentTypes); + recordConstructor.setAccessible(true); + + List<String> previousFields = + Arrays.stream(fields) + // There may be (removed) null fields due to schema evolution + .filter(Objects::nonNull) + .map(Field::getName) + .collect(Collectors.toList()); Review Comment: ```suggestion .toList(); ``` nit: Feel free to ignore these kind of suggestions if you think it's too much. I'd totally understand. I'm just mentioning it because Intellij raises the warning (at least in my setup). I didn't do any changes to Intellij's default suggestions config. Fixing the warning at the beginning makes me hope that there are less mini PRs created in the future because someone found a (kind of reasonable) warning raised by Intellij. ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -89,6 +89,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private transient ClassLoader cl; + private final boolean isRecord; + + private transient JavaRecordHelper<T> recordHelper; Review Comment: ```suggestion @Nullable private transient JavaRecordHelper<T> recordHelper; ``` ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java: ########## @@ -89,6 +89,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private transient ClassLoader cl; + private final boolean isRecord; + + private transient JavaRecordSerializationHelper<T> recordHelper; + /** Constructor to create a new {@link PojoSerializer}. */ @SuppressWarnings("unchecked") public PojoSerializer( Review Comment: :+1: I saw your JavaDoc extension for `Types.POJO`. But unfortunately, we have redundant documentation in `PojoTypeInfo` for this one. What about updating `PojoTypeInfo` and replacing the redundant docs with a `@see PojoTypeInfo` in the JavaDoc of `Types.POJO`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org