JingsongLi commented on code in PR #771: URL: https://github.com/apache/incubator-paimon/pull/771#discussion_r1154174589
########## paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java: ########## @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.factories; + +import org.apache.paimon.flink.log.LogStoreTableFactory; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.FallbackKey; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.factories.DecodingFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.DynamicTableFactory.Context; +import org.apache.flink.table.factories.EncodingFormatFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FormatFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap; +import static org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey; +import static org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER; + +/** Utility for working with {@link Factory}s. */ +public final class FlinkFactoryUtil { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkFactoryUtil.class); + + /** + * Describes the property version. This can be used for backwards compatibility in case the + * property format changes. + */ + public static final ConfigOption<Integer> PROPERTY_VERSION = Review Comment: Useless ########## paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java: ########## @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.factories; + +import org.apache.paimon.flink.log.LogStoreTableFactory; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.FallbackKey; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.factories.DecodingFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.DynamicTableFactory.Context; +import org.apache.flink.table.factories.EncodingFormatFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FormatFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap; +import static org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey; +import static org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER; + +/** Utility for working with {@link Factory}s. */ +public final class FlinkFactoryUtil { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkFactoryUtil.class); + + /** + * Describes the property version. This can be used for backwards compatibility in case the + * property format changes. + */ + public static final ConfigOption<Integer> PROPERTY_VERSION = + ConfigOptions.key("property-version") + .intType() + .defaultValue(1) + .withDescription( + "Version of the overall property design. This option is meant for future backwards compatibility."); + + public static final ConfigOption<String> CONNECTOR = + ConfigOptions.key("connector") + .stringType() + .noDefaultValue() + .withDescription( + "Uniquely identifies the connector of a dynamic table that is used for accessing data in " + + "an external system. Its value is used during table source and table sink discovery."); + + public static final ConfigOption<String> FORMAT = + ConfigOptions.key("format") + .stringType() + .noDefaultValue() + .withDescription( + "Defines the format identifier for encoding data. " + + "The identifier is used to discover a suitable format factory."); + + /** + * Suffix for keys of {@link ConfigOption} in case a connector requires multiple formats (e.g. + * for both key and value). + * + * <p>See {@link #createFlinkTableFactoryHelper(LogStoreTableFactory, Context)} Context)} for + * more information. + */ + public static final String FORMAT_SUFFIX = ".format"; + + /** + * Creates a utility that helps in discovering formats, merging options with {@link + * DynamicTableFactory.Context#getEnrichmentOptions()} and validating them all for a {@link + * LogStoreTableFactory}. + * + * <p>The following example sketches the usage: + * + * <pre>{@code + * // in createDynamicTableSource() + * helper = FlinkFactoryUtil.createFlinkTableFactoryHelper(this, context); + * + * keyFormat = helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT); + * valueFormat = helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT); + * + * helper.validate(); + * + * ... // construct connector with discovered formats + * }</pre> + * + * <p>Note: The format option parameter of {@link Review Comment: We don't need such complicated documentation. ########## paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java: ########## @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.factories; + +import org.apache.paimon.flink.log.LogStoreTableFactory; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.FallbackKey; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.factories.DecodingFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.DynamicTableFactory.Context; +import org.apache.flink.table.factories.EncodingFormatFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FormatFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap; +import static org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey; +import static org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER; + +/** Utility for working with {@link Factory}s. */ +public final class FlinkFactoryUtil { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkFactoryUtil.class); + + /** + * Describes the property version. This can be used for backwards compatibility in case the + * property format changes. + */ + public static final ConfigOption<Integer> PROPERTY_VERSION = + ConfigOptions.key("property-version") + .intType() + .defaultValue(1) + .withDescription( + "Version of the overall property design. This option is meant for future backwards compatibility."); + + public static final ConfigOption<String> CONNECTOR = Review Comment: Useless -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org