This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dd382894b4d [FLINK-34121][core] Introduce pipeline.force-kryo-avro to
control whether to force registration of Avro serializer with Kryo
dd382894b4d is described below
commit dd382894b4da5b2d153913c92b6679fbd877b18b
Author: JunRuiLee <[email protected]>
AuthorDate: Tue Jan 30 16:43:46 2024 +0800
[FLINK-34121][core] Introduce pipeline.force-kryo-avro to control whether
to force registration of Avro serializer with Kryo
---
.../generated/pipeline_configuration.html | 6 ++++
.../api/common/serialization/SerializerConfig.java | 11 +++++++
.../common/serialization/SerializerConfigImpl.java | 17 ++++++++++
.../apache/flink/api/java/typeutils/AvroUtils.java | 20 ++++++++++++
.../typeutils/runtime/kryo/KryoSerializer.java | 25 ++++++++++++---
.../flink/configuration/PipelineOptions.java | 18 +++++++++++
.../avro/AvroKryoSerializerRegistrationsTest.java | 36 ++++++++++++++++++++++
.../flink/table/catalog/DataTypeFactoryImpl.java | 4 +++
8 files changed, 132 insertions(+), 5 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/pipeline_configuration.html
b/docs/layouts/shortcodes/generated/pipeline_configuration.html
index 825be987aa5..f8b511206c7 100644
--- a/docs/layouts/shortcodes/generated/pipeline_configuration.html
+++ b/docs/layouts/shortcodes/generated/pipeline_configuration.html
@@ -50,6 +50,12 @@
<td>Boolean</td>
<td>If enabled, forces TypeExtractor to use Kryo serializer for
POJOS even though we could analyze as POJO. In some cases this might be
preferable. For example, when using interfaces with subclasses that cannot be
analyzed as POJO.</td>
</tr>
+ <tr>
+ <td><h5>pipeline.force-kryo-avro</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Boolean</td>
+ <td>Forces Flink to register avro classes in kryo serializer.<br
/><br />Important: Make sure to include the flink-avro module. Otherwise,
nothing will be registered. For backward compatibility, the default value is
empty to conform to the behavior of the older version. That is, always register
avro with kryo, and if flink-avro is not in the class path, register a dummy
serializer. In Flink-2.0, we will set the default value to true.</td>
+ </tr>
<tr>
<td><h5>pipeline.generic-types</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
index 76642f23df7..0b1e48377f2 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.TernaryBoolean;
import com.esotericsoftware.kryo.Serializer;
@@ -198,6 +199,16 @@ public interface SerializerConfig extends Serializable {
@Internal
void setForceAvro(boolean forceAvro);
+ /**
+ * The method will be converted to private in the next Flink major version
after removing its
+ * deprecated caller methods.
+ */
+ @Internal
+ public void setForceKryoAvro(boolean forceKryoAvro);
+
+ /** Returns whether forces Flink to register Apache Avro classes in Kryo
serializer. */
+ TernaryBoolean isForceKryoAvroEnabled();
+
/**
* Sets all relevant options contained in the {@link ReadableConfig} such
as e.g. {@link
* PipelineOptions#FORCE_KRYO}.
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
index 91fdea93bcf..c5dc1a214e9 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TernaryBoolean;
import com.esotericsoftware.kryo.Serializer;
@@ -279,6 +280,19 @@ public final class SerializerConfigImpl implements
SerializerConfig {
configuration.set(PipelineOptions.FORCE_AVRO, forceAvro);
}
+ @Override
+ public void setForceKryoAvro(boolean forceKryoAvro) {
+ configuration.set(PipelineOptions.FORCE_KRYO_AVRO, forceKryoAvro);
+ }
+
+ @Override
+ public TernaryBoolean isForceKryoAvroEnabled() {
+ return configuration
+ .getOptional(PipelineOptions.FORCE_KRYO_AVRO)
+ .map(TernaryBoolean::fromBoolean)
+ .orElse(TernaryBoolean.UNDEFINED);
+ }
+
@Override
public boolean equals(Object obj) {
if (obj instanceof SerializerConfigImpl) {
@@ -351,6 +365,9 @@ public final class SerializerConfigImpl implements
SerializerConfig {
configuration.getOptional(PipelineOptions.GENERIC_TYPES).ifPresent(this::setGenericTypes);
configuration.getOptional(PipelineOptions.FORCE_KRYO).ifPresent(this::setForceKryo);
configuration.getOptional(PipelineOptions.FORCE_AVRO).ifPresent(this::setForceAvro);
+ configuration
+ .getOptional(PipelineOptions.FORCE_KRYO_AVRO)
+ .ifPresent(this::setForceKryoAvro);
configuration
.getOptional(PipelineOptions.KRYO_DEFAULT_SERIALIZERS)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
index c82cbfcbeb1..c10fccf3266 100644
---
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
+++
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroUtils.java
@@ -25,6 +25,7 @@ import
org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
import java.util.LinkedHashMap;
+import java.util.Optional;
import static
org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
@@ -57,6 +58,25 @@ public abstract class AvroUtils {
}
}
+ /**
+ * Returns either {@code Optional#EMPTY} which throw an exception in cases
where Avro would be
+ * needed or loads the specific utils for Avro from flink-avro.
+ */
+ public static Optional<AvroUtils> tryGetAvroUtils() {
+ // try and load the special AvroUtils from the flink-avro package
+ try {
+ Class<?> clazz =
+ Class.forName(
+ AVRO_KRYO_UTILS, false,
Thread.currentThread().getContextClassLoader());
+ return
Optional.of(clazz.asSubclass(AvroUtils.class).getConstructor().newInstance());
+ } catch (ClassNotFoundException e) {
+ // cannot find the utils, return none.
+ return Optional.empty();
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate " +
AVRO_KRYO_UTILS + ".", e);
+ }
+ }
+
// ------------------------------------------------------------------------
/**
diff --git
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 1c0c31d2e40..b25175f4175 100644
---
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TernaryBoolean;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
@@ -191,7 +192,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
this.type,
serializerConfig.getRegisteredKryoTypes(),
serializerConfig.getRegisteredTypesWithKryoSerializerClasses(),
-
serializerConfig.getRegisteredTypesWithKryoSerializers());
+
serializerConfig.getRegisteredTypesWithKryoSerializers(),
+ serializerConfig.isForceKryoAvroEnabled());
}
/**
@@ -567,7 +569,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
registeredTypesWithSerializerClasses,
LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
- registeredTypesWithSerializers) {
+ registeredTypesWithSerializers,
+ TernaryBoolean isForceAvroKryoEnabledOpt) {
final LinkedHashMap<String, KryoRegistration> kryoRegistrations = new
LinkedHashMap<>();
@@ -599,8 +602,19 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
registeredTypeWithSerializerEntry.getValue()));
}
- // add Avro support if flink-avro is available; a dummy otherwise
-
AvroUtils.getAvroUtils().addAvroGenericDataArrayRegistration(kryoRegistrations);
+ // we always register avro to maintain backward compatibility if this
option is not present.
+ if (isForceAvroKryoEnabledOpt.getAsBoolean() == null) {
+ // add Avro support if flink-avro is available; a dummy otherwise
+
AvroUtils.getAvroUtils().addAvroGenericDataArrayRegistration(kryoRegistrations);
+ } else if (isForceAvroKryoEnabledOpt.getAsBoolean()) {
+ // we only register if flink-avro is available. That is, we won't
register the
+ // dummy serializer.
+ AvroUtils.tryGetAvroUtils()
+ .ifPresent(
+ avroUtils ->
+
avroUtils.addAvroGenericDataArrayRegistration(
+ kryoRegistrations));
+ }
return kryoRegistrations;
}
@@ -627,7 +641,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
type,
registeredTypes,
registeredTypesWithSerializerClasses,
- registeredTypesWithSerializers);
+ registeredTypesWithSerializers,
+ TernaryBoolean.UNDEFINED);
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
index 0b28242c091..0c2a9f2aaa5 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
@@ -146,6 +146,24 @@ public class PipelineOptions {
+ " analyze as POJO. In some cases this
might be preferable. For example, when using interfaces"
+ " with subclasses that cannot be
analyzed as POJO.");
+ public static final ConfigOption<Boolean> FORCE_KRYO_AVRO =
+ key("pipeline.force-kryo-avro")
+ .booleanType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Forces Flink to register avro
classes in kryo serializer.")
+ .linebreak()
+ .linebreak()
+ .text(
+ "Important: Make sure to include
the flink-avro module."
+ + " Otherwise, nothing
will be registered. For backward compatibility,"
+ + " the default value is
empty to conform to the behavior of the older version."
+ + " That is, always
register avro with kryo, and if flink-avro is not in the class"
+ + " path, register a dummy
serializer. In Flink-2.0, we will set the default value to true.")
+ .build());
+
public static final ConfigOption<Boolean> GENERIC_TYPES =
key("pipeline.generic-types")
.booleanType()
diff --git
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
index 02360237dc4..84c923c3e6e 100644
---
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
+++
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
@@ -18,11 +18,15 @@
package org.apache.flink.formats.avro;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Registration;
+import org.apache.avro.generic.GenericData;
import org.junit.jupiter.api.Test;
import java.io.BufferedReader;
@@ -33,6 +37,8 @@ import java.io.IOException;
import java.io.InputStreamReader;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
/**
@@ -81,6 +87,36 @@ class AvroKryoSerializerRegistrationsTest {
}
}
+ @Test
+ void testEnableForceKryoAvroRegister() {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.getSerializerConfig().setForceKryoAvro(true);
+ final Kryo kryo = new KryoSerializer<>(Integer.class,
executionConfig).getKryo();
+ kryo.setRegistrationRequired(true);
+ assertThatCode(() -> kryo.getRegistration(GenericData.Array.class))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void testDefaultForceKryoAvroRegister() {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ final Kryo kryo = new KryoSerializer<>(Integer.class,
executionConfig).getKryo();
+ kryo.setRegistrationRequired(true);
+ assertThatCode(() -> kryo.getRegistration(GenericData.Array.class))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ void testDisableForceKryoAvroRegister() {
+ Configuration configuration = new Configuration();
+ configuration.set(PipelineOptions.FORCE_KRYO_AVRO, false);
+ ExecutionConfig executionConfig = new ExecutionConfig(configuration);
+ final Kryo kryo = new KryoSerializer<>(Integer.class,
executionConfig).getKryo();
+ kryo.setRegistrationRequired(true);
+ assertThatThrownBy(() -> kryo.getRegistration(GenericData.Array.class))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
/**
* Creates a Kryo serializer and writes the default registrations out to a
comma separated file
* with one entry per line:
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
index 64afd71307d..8dd7a7c3a40 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
@@ -42,6 +42,7 @@ import
org.apache.flink.table.types.utils.TypeInfoDataTypeConverter;
import javax.annotation.Nullable;
+import java.util.Optional;
import java.util.function.Supplier;
import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
@@ -149,6 +150,9 @@ final class DataTypeFactoryImpl implements DataTypeFactory {
newSerializerConfig.addDefaultKryoSerializer(
c, s.getSerializer()));
+
Optional.ofNullable(serializerConfig.isForceKryoAvroEnabled().getAsBoolean())
+ .ifPresent(serializerConfig::setForceKryoAvro);
+
serializerConfig
.getDefaultKryoSerializerClasses()
.forEach(newSerializerConfig::addDefaultKryoSerializer);