JingsongLi commented on code in PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#discussion_r1154174589


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.FallbackKey;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.factories.DecodingFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.EncodingFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
+import static 
org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
+import static 
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
+
+/** Utility for working with {@link Factory}s. */
+public final class FlinkFactoryUtil {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkFactoryUtil.class);
+
+    /**
+     * Describes the property version. This can be used for backwards 
compatibility in case the
+     * property format changes.
+     */
+    public static final ConfigOption<Integer> PROPERTY_VERSION =

Review Comment:
   Useless



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.FallbackKey;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.factories.DecodingFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.EncodingFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
+import static 
org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
+import static 
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
+
+/** Utility for working with {@link Factory}s. */
+public final class FlinkFactoryUtil {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkFactoryUtil.class);
+
+    /**
+     * Describes the property version. This can be used for backwards 
compatibility in case the
+     * property format changes.
+     */
+    public static final ConfigOption<Integer> PROPERTY_VERSION =
+            ConfigOptions.key("property-version")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Version of the overall property design. This 
option is meant for future backwards compatibility.");
+
+    public static final ConfigOption<String> CONNECTOR =
+            ConfigOptions.key("connector")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Uniquely identifies the connector of a dynamic 
table that is used for accessing data in "
+                                    + "an external system. Its value is used 
during table source and table sink discovery.");
+
+    public static final ConfigOption<String> FORMAT =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines the format identifier for encoding data. "
+                                    + "The identifier is used to discover a 
suitable format factory.");
+
+    /**
+     * Suffix for keys of {@link ConfigOption} in case a connector requires 
multiple formats (e.g.
+     * for both key and value).
+     *
+     * <p>See {@link #createFlinkTableFactoryHelper(LogStoreTableFactory, 
Context)} Context)} for
+     * more information.
+     */
+    public static final String FORMAT_SUFFIX = ".format";
+
+    /**
+     * Creates a utility that helps in discovering formats, merging options 
with {@link
+     * DynamicTableFactory.Context#getEnrichmentOptions()} and validating them 
all for a {@link
+     * LogStoreTableFactory}.
+     *
+     * <p>The following example sketches the usage:
+     *
+     * <pre>{@code
+     * // in createDynamicTableSource()
+     * helper = FlinkFactoryUtil.createFlinkTableFactoryHelper(this, context);
+     *
+     * keyFormat = 
helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
+     * valueFormat = 
helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
+     *
+     * helper.validate();
+     *
+     * ... // construct connector with discovered formats
+     * }</pre>
+     *
+     * <p>Note: The format option parameter of {@link

Review Comment:
   We don't need such complicated documentation.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.FallbackKey;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.factories.DecodingFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.EncodingFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
+import static 
org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
+import static 
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
+
+/** Utility for working with {@link Factory}s. */
+public final class FlinkFactoryUtil {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkFactoryUtil.class);
+
+    /**
+     * Describes the property version. This can be used for backwards 
compatibility in case the
+     * property format changes.
+     */
+    public static final ConfigOption<Integer> PROPERTY_VERSION =
+            ConfigOptions.key("property-version")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Version of the overall property design. This 
option is meant for future backwards compatibility.");
+
+    public static final ConfigOption<String> CONNECTOR =

Review Comment:
   Useless



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to