AHeise commented on a change in pull request #15304: URL: https://github.com/apache/flink/pull/15304#discussion_r685505030
########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/factories/BooleanSchemaFactory.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema.factories; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.connector.pulsar.common.schema.PulsarSchemaFactory; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.BooleanSchema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.util.Objects; + +/** The schema factory for pulsar's {@link BooleanSchema}. */ +public class BooleanSchemaFactory implements PulsarSchemaFactory<Boolean> { Review comment: I think this and 90% of the other factories can be replaced with ``` public class PrimitiveSchemaFactory<T> implements PulsarSchemaFactory<T> { private final Schema<T> schema; private final TypeInformation<T> typeInformation; public PrimitiveSchemaFactory(Schema<T> schema, TypeInformation<T> typeInformation) { this.schema = schema; this.typeInformation = typeInformation; } @Override public SchemaType type() { return schema.getSchemaInfo().getType(); } @Override public boolean supported(SchemaInfo info) { return info.getType() == type() && Objects.equals(schema.getSchemaInfo().getName(), info.getName()); } @Override public Schema<T> createSchema(SchemaInfo info) { return schema; } @Override public TypeInformation<T> createTypeInfo(SchemaInfo info) { return typeInformation; } } ``` We still need the interface for complex types (map, avro, json, protobuf). ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractionUtils; +import org.apache.flink.connector.pulsar.common.schema.factories.BooleanSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteBufSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteBufferSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.BytesSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.DateSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.DoubleSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.FloatSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.InstantSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.IntSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.KeyValueSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalDateSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalDateTimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalTimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LongSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.NoneSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ShortSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.StringSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.TimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.TimestampSchemaFactory; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.util.Collections.emptySet; + +/** + * Util class for pulsar schema. Register all the {@link PulsarSchemaFactory} in this class and + * provide the {@link TypeInformation} or {@link SerializablePulsarSchema} conversion. + */ +@Internal +public final class PulsarSchemaUtils { + + private static final ConcurrentMap<SchemaType, Set<PulsarSchemaFactory<?>>> FACTORY_REGISTER = + new ConcurrentHashMap<>(); + + static { + // Register the predefined schema factories here. + registerSchemaFactory(ByteSchemaFactory.class); Review comment: Instead of registering a class and instantiate it with refactoring, I'd pass an instance here. `registerSchemaFactory(new ByteSchemaFactory());` That approach allows us to replace it with `registerSchemaFactory(new PrimitiveSchemaFactory(ByteSchema.of(), Types.BYTE));` with the implementation outlined later. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTypeSerializer.java ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.pulsar.client.api.Schema; + +import java.io.IOException; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Wrap the pulsar {@code Schema} into a flink {@code TypeSerializer}. */ +@Internal +public class PulsarSchemaTypeSerializer<T> extends TypeSerializer<T> { Review comment: Where is this used? ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/SerializablePulsarSchema.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema; + +import org.apache.flink.annotation.Internal; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.connector.pulsar.common.schema.PulsarSchemaUtils.createSchema; +import static org.apache.flink.connector.pulsar.common.schema.PulsarSchemaUtils.extractSchemaClass; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** A wrapper for Pulsar {@link Schema}, make sure it is Java serializable. */ +@Internal +public final class SerializablePulsarSchema<T> implements Serializable { + private static final long serialVersionUID = -2561088131419607555L; + + private transient Schema<T> schema; + + public SerializablePulsarSchema() {} + + public SerializablePulsarSchema(Schema<T> schema) { + SchemaInfo info = schema.getSchemaInfo(); + // Validate if this schema is supported. + this.schema = createSchema(info); + } + + public Schema<T> getPulsarSchema() { + return checkNotNull(schema); + } + + public Class<T> getRecordClass() { + return extractSchemaClass(schema); + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + if (schema == null) { Review comment: schema is always non-null. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/SerializablePulsarSchema.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema; + +import org.apache.flink.annotation.Internal; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.connector.pulsar.common.schema.PulsarSchemaUtils.createSchema; +import static org.apache.flink.connector.pulsar.common.schema.PulsarSchemaUtils.extractSchemaClass; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** A wrapper for Pulsar {@link Schema}, make sure it is Java serializable. */ +@Internal +public final class SerializablePulsarSchema<T> implements Serializable { + private static final long serialVersionUID = -2561088131419607555L; + + private transient Schema<T> schema; + + public SerializablePulsarSchema() {} Review comment: Not needed for `Serializable` and your code. Then `schema` is also guaranteed to be non-null. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractionUtils; +import org.apache.flink.connector.pulsar.common.schema.factories.BooleanSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteBufSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteBufferSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.BytesSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.DateSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.DoubleSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.FloatSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.InstantSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.IntSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.KeyValueSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalDateSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalDateTimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalTimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LongSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.NoneSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ShortSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.StringSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.TimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.TimestampSchemaFactory; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.util.Collections.emptySet; + +/** + * Util class for pulsar schema. Register all the {@link PulsarSchemaFactory} in this class and + * provide the {@link TypeInformation} or {@link SerializablePulsarSchema} conversion. + */ +@Internal +public final class PulsarSchemaUtils { + + private static final ConcurrentMap<SchemaType, Set<PulsarSchemaFactory<?>>> FACTORY_REGISTER = + new ConcurrentHashMap<>(); + + static { + // Register the predefined schema factories here. + registerSchemaFactory(ByteSchemaFactory.class); + registerSchemaFactory(ShortSchemaFactory.class); + registerSchemaFactory(IntSchemaFactory.class); + registerSchemaFactory(LongSchemaFactory.class); + registerSchemaFactory(StringSchemaFactory.class); + registerSchemaFactory(FloatSchemaFactory.class); + registerSchemaFactory(DoubleSchemaFactory.class); + registerSchemaFactory(BooleanSchemaFactory.class); + registerSchemaFactory(BytesSchemaFactory.class); + registerSchemaFactory(NoneSchemaFactory.class); + registerSchemaFactory(DateSchemaFactory.class); + registerSchemaFactory(TimeSchemaFactory.class); + registerSchemaFactory(TimestampSchemaFactory.class); + registerSchemaFactory(InstantSchemaFactory.class); + registerSchemaFactory(LocalDateSchemaFactory.class); + registerSchemaFactory(LocalTimeSchemaFactory.class); + registerSchemaFactory(LocalDateTimeSchemaFactory.class); + registerSchemaFactory(KeyValueSchemaFactory.class); + registerSchemaFactory(ByteBufferSchemaFactory.class); + registerSchemaFactory(ByteBufSchemaFactory.class); + } + + private PulsarSchemaUtils() { + // No need to create instance. + } + + private static void registerSchemaFactory(Class<? extends PulsarSchemaFactory> factoryClass) { + PulsarSchemaFactory<?> factory = InstantiationUtil.instantiate(factoryClass); + Set<PulsarSchemaFactory<?>> factories = + FACTORY_REGISTER.computeIfAbsent(factory.type(), f -> new HashSet<>()); + + factories.add(factory); + } + + /** Extract the defined class type from pulsar schema type. */ + public static <T> Class<T> extractSchemaClass(Schema<T> schema) { + Type argument = + ((ParameterizedType) schema.getClass().getGenericSuperclass()) + .getActualTypeArguments()[0]; + + return TypeExtractionUtils.typeToClass(argument); Review comment: ```suggestion return ReflectionUtil.<T>getTemplateType1(schema.getClass()); ``` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/SerializablePulsarSchema.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema; + +import org.apache.flink.annotation.Internal; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.connector.pulsar.common.schema.PulsarSchemaUtils.createSchema; +import static org.apache.flink.connector.pulsar.common.schema.PulsarSchemaUtils.extractSchemaClass; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** A wrapper for Pulsar {@link Schema}, make sure it is Java serializable. */ +@Internal +public final class SerializablePulsarSchema<T> implements Serializable { + private static final long serialVersionUID = -2561088131419607555L; + + private transient Schema<T> schema; + + public SerializablePulsarSchema() {} + + public SerializablePulsarSchema(Schema<T> schema) { + SchemaInfo info = schema.getSchemaInfo(); + // Validate if this schema is supported. + this.schema = createSchema(info); + } + + public Schema<T> getPulsarSchema() { + return checkNotNull(schema); + } + + public Class<T> getRecordClass() { + return extractSchemaClass(schema); + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + if (schema == null) { + oos.writeBoolean(false); + } else { + SchemaInfo info = schema.getSchemaInfo(); + oos.writeUTF(info.getName()); + + byte[] schemaBytes = info.getSchema(); + oos.writeInt(schemaBytes.length); + oos.write(schemaBytes); + + SchemaType type = info.getType(); + oos.writeInt(type.getValue()); + + Map<String, String> properties = info.getProperties(); + oos.writeInt(properties.size()); + for (Map.Entry<String, String> entry : properties.entrySet()) { + oos.writeUTF(entry.getKey()); + oos.writeUTF(entry.getValue()); + } + } + } + + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + if (ois.readBoolean()) { Review comment: this is currently broken `if schema != null` no boolean is written. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractionUtils; +import org.apache.flink.connector.pulsar.common.schema.factories.BooleanSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteBufSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteBufferSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.BytesSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.DateSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.DoubleSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.FloatSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.InstantSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.IntSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.KeyValueSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalDateSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalDateTimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalTimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LongSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.NoneSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ShortSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.StringSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.TimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.TimestampSchemaFactory; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.util.Collections.emptySet; + +/** + * Util class for pulsar schema. Register all the {@link PulsarSchemaFactory} in this class and + * provide the {@link TypeInformation} or {@link SerializablePulsarSchema} conversion. + */ +@Internal +public final class PulsarSchemaUtils { + + private static final ConcurrentMap<SchemaType, Set<PulsarSchemaFactory<?>>> FACTORY_REGISTER = + new ConcurrentHashMap<>(); + + static { + // Register the predefined schema factories here. + registerSchemaFactory(ByteSchemaFactory.class); + registerSchemaFactory(ShortSchemaFactory.class); + registerSchemaFactory(IntSchemaFactory.class); + registerSchemaFactory(LongSchemaFactory.class); + registerSchemaFactory(StringSchemaFactory.class); + registerSchemaFactory(FloatSchemaFactory.class); + registerSchemaFactory(DoubleSchemaFactory.class); + registerSchemaFactory(BooleanSchemaFactory.class); + registerSchemaFactory(BytesSchemaFactory.class); + registerSchemaFactory(NoneSchemaFactory.class); + registerSchemaFactory(DateSchemaFactory.class); + registerSchemaFactory(TimeSchemaFactory.class); + registerSchemaFactory(TimestampSchemaFactory.class); + registerSchemaFactory(InstantSchemaFactory.class); + registerSchemaFactory(LocalDateSchemaFactory.class); + registerSchemaFactory(LocalTimeSchemaFactory.class); + registerSchemaFactory(LocalDateTimeSchemaFactory.class); + registerSchemaFactory(KeyValueSchemaFactory.class); + registerSchemaFactory(ByteBufferSchemaFactory.class); + registerSchemaFactory(ByteBufSchemaFactory.class); + } + + private PulsarSchemaUtils() { + // No need to create instance. + } + + private static void registerSchemaFactory(Class<? extends PulsarSchemaFactory> factoryClass) { + PulsarSchemaFactory<?> factory = InstantiationUtil.instantiate(factoryClass); + Set<PulsarSchemaFactory<?>> factories = + FACTORY_REGISTER.computeIfAbsent(factory.type(), f -> new HashSet<>()); + + factories.add(factory); + } + + /** Extract the defined class type from pulsar schema type. */ + public static <T> Class<T> extractSchemaClass(Schema<T> schema) { + Type argument = + ((ParameterizedType) schema.getClass().getGenericSuperclass()) + .getActualTypeArguments()[0]; + + return TypeExtractionUtils.typeToClass(argument); + } + + /** + * Pulsar has a hugh set of built-in schemas. We can create them by the given {@link + * SchemaInfo}. + * + * <p>Supported schema check list. + * + * <pre>{@code + * - [ ] AvroSchema + * - [x] BooleanSchema + * - [x] ByteBufferSchema + * - [x] ByteBufSchema + * - [x] ByteSchema + * - [x] BytesSchema + * - [x] DateSchema + * - [x] DoubleSchema + * - [x] FloatSchema + * - [x] InstantSchema + * - [x] IntSchema + * - [x] KeyValueSchemaImpl + * - [ ] JSONSchema + * - [x] LocalDateSchema + * - [x] LocalDateTimeSchema + * - [x] LocalTimeSchema + * - [x] LongSchema + * - [ ] ProtobufNativeSchema + * - [ ] ProtobufSchema + * - [x] ShortSchema + * - [x] StringSchema + * - [x] TimeSchema + * - [x] TimestampSchema + * }</pre> + */ + @SuppressWarnings("unchecked") + public static <T> Schema<T> createSchema(SchemaInfo info) { + return FACTORY_REGISTER.getOrDefault(info.getType(), emptySet()).stream() Review comment: When do we actually have one than more type for a given `SchemaInfo`? I was thinking that we should rather fail if we have conflicts instead of picking _some_ candidate. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractionUtils; +import org.apache.flink.connector.pulsar.common.schema.factories.BooleanSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteBufSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteBufferSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.BytesSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.DateSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.DoubleSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.FloatSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.InstantSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.IntSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.KeyValueSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalDateSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalDateTimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalTimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LongSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.NoneSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ShortSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.StringSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.TimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.TimestampSchemaFactory; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.util.Collections.emptySet; + +/** + * Util class for pulsar schema. Register all the {@link PulsarSchemaFactory} in this class and + * provide the {@link TypeInformation} or {@link SerializablePulsarSchema} conversion. + */ +@Internal +public final class PulsarSchemaUtils { + + private static final ConcurrentMap<SchemaType, Set<PulsarSchemaFactory<?>>> FACTORY_REGISTER = Review comment: No need for synchronization here. The map is fully filled in `static {}`. You could even use `ImmutableMap.Builder` to create the map. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/SerializablePulsarSchema.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema; + +import org.apache.flink.annotation.Internal; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.connector.pulsar.common.schema.PulsarSchemaUtils.createSchema; +import static org.apache.flink.connector.pulsar.common.schema.PulsarSchemaUtils.extractSchemaClass; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** A wrapper for Pulsar {@link Schema}, make sure it is Java serializable. */ +@Internal +public final class SerializablePulsarSchema<T> implements Serializable { + private static final long serialVersionUID = -2561088131419607555L; + + private transient Schema<T> schema; + + public SerializablePulsarSchema() {} + + public SerializablePulsarSchema(Schema<T> schema) { + SchemaInfo info = schema.getSchemaInfo(); + // Validate if this schema is supported. + this.schema = createSchema(info); + } + + public Schema<T> getPulsarSchema() { + return checkNotNull(schema); Review comment: schema is always non-null. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.schema; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractionUtils; +import org.apache.flink.connector.pulsar.common.schema.factories.BooleanSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteBufSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteBufferSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ByteSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.BytesSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.DateSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.DoubleSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.FloatSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.InstantSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.IntSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.KeyValueSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalDateSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalDateTimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LocalTimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.LongSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.NoneSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.ShortSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.StringSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.TimeSchemaFactory; +import org.apache.flink.connector.pulsar.common.schema.factories.TimestampSchemaFactory; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.util.Collections.emptySet; + +/** + * Util class for pulsar schema. Register all the {@link PulsarSchemaFactory} in this class and + * provide the {@link TypeInformation} or {@link SerializablePulsarSchema} conversion. + */ +@Internal +public final class PulsarSchemaUtils { + + private static final ConcurrentMap<SchemaType, Set<PulsarSchemaFactory<?>>> FACTORY_REGISTER = + new ConcurrentHashMap<>(); + + static { + // Register the predefined schema factories here. + registerSchemaFactory(ByteSchemaFactory.class); + registerSchemaFactory(ShortSchemaFactory.class); + registerSchemaFactory(IntSchemaFactory.class); + registerSchemaFactory(LongSchemaFactory.class); + registerSchemaFactory(StringSchemaFactory.class); + registerSchemaFactory(FloatSchemaFactory.class); + registerSchemaFactory(DoubleSchemaFactory.class); + registerSchemaFactory(BooleanSchemaFactory.class); + registerSchemaFactory(BytesSchemaFactory.class); + registerSchemaFactory(NoneSchemaFactory.class); + registerSchemaFactory(DateSchemaFactory.class); + registerSchemaFactory(TimeSchemaFactory.class); + registerSchemaFactory(TimestampSchemaFactory.class); + registerSchemaFactory(InstantSchemaFactory.class); + registerSchemaFactory(LocalDateSchemaFactory.class); + registerSchemaFactory(LocalTimeSchemaFactory.class); + registerSchemaFactory(LocalDateTimeSchemaFactory.class); + registerSchemaFactory(KeyValueSchemaFactory.class); + registerSchemaFactory(ByteBufferSchemaFactory.class); + registerSchemaFactory(ByteBufSchemaFactory.class); + } + + private PulsarSchemaUtils() { + // No need to create instance. + } + + private static void registerSchemaFactory(Class<? extends PulsarSchemaFactory> factoryClass) { + PulsarSchemaFactory<?> factory = InstantiationUtil.instantiate(factoryClass); + Set<PulsarSchemaFactory<?>> factories = + FACTORY_REGISTER.computeIfAbsent(factory.type(), f -> new HashSet<>()); + + factories.add(factory); + } + + /** Extract the defined class type from pulsar schema type. */ + public static <T> Class<T> extractSchemaClass(Schema<T> schema) { + Type argument = + ((ParameterizedType) schema.getClass().getGenericSuperclass()) + .getActualTypeArguments()[0]; + + return TypeExtractionUtils.typeToClass(argument); + } + + /** + * Pulsar has a hugh set of built-in schemas. We can create them by the given {@link + * SchemaInfo}. + * + * <p>Supported schema check list. + * + * <pre>{@code + * - [ ] AvroSchema + * - [x] BooleanSchema + * - [x] ByteBufferSchema + * - [x] ByteBufSchema + * - [x] ByteSchema + * - [x] BytesSchema + * - [x] DateSchema + * - [x] DoubleSchema + * - [x] FloatSchema + * - [x] InstantSchema + * - [x] IntSchema + * - [x] KeyValueSchemaImpl + * - [ ] JSONSchema + * - [x] LocalDateSchema + * - [x] LocalDateTimeSchema + * - [x] LocalTimeSchema + * - [x] LongSchema + * - [ ] ProtobufNativeSchema + * - [ ] ProtobufSchema + * - [x] ShortSchema + * - [x] StringSchema + * - [x] TimeSchema + * - [x] TimestampSchema + * }</pre> + */ + @SuppressWarnings("unchecked") + public static <T> Schema<T> createSchema(SchemaInfo info) { + return FACTORY_REGISTER.getOrDefault(info.getType(), emptySet()).stream() + .filter(factory -> factory.supported(info)) + .findFirst() + .map(factory -> (Schema<T>) factory.createSchema(info)) + .orElseThrow( + () -> + new IllegalArgumentException( + "Retrieve schema instance from this schema info '" + + info + + "' is not supported yet")); + } + + /** + * Convert the {@link Schema} into a flink manageable {@link TypeInformation}. We only support + * all the primitive types in pulsar built-in schema currently. + * + * <p>Please visit <a + * href="ttp://pulsar.apache.org/docs/en/schema-understand/#schema-type">pulsar + * documentation</a> for detailed schema type clarify. + */ + @SuppressWarnings("unchecked") + public static <T> TypeInformation<T> createTypeInformation(Schema<T> schema) { + // SchemaInfo contains all the required information for deserializing. + SchemaInfo info = schema.getSchemaInfo(); + return FACTORY_REGISTER.getOrDefault(info.getType(), emptySet()).stream() + .filter(factory -> factory.supported(info)) + .findFirst() + .map(factory -> (TypeInformation<T>) factory.createTypeInfo(info)) + .orElseThrow( + () -> + new IllegalArgumentException( + "Retrieve schema instance from this schema info '" Review comment: ```suggestion "Retrieve type information from this schema info '" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
