JingsongLi commented on a change in pull request #12212:
URL: https://github.com/apache/flink/pull/12212#discussion_r426499466



##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
##########
@@ -23,137 +23,104 @@
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
 import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
 import org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader;
 import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.FileSystemFormatFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.utils.PartitionPathUtils;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.parquet.hadoop.ParquetOutputFormat;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static 
org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
-import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
 import static 
org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
 
 /**
  * Parquet {@link FileSystemFormatFactory} for file system.
  */
 public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory 
{
 
-       public static final ConfigOption<Boolean> UTC_TIMEZONE = 
key("format.utc-timezone")
+       public static final String IDENTIFIER = "parquet";
+
+       public static final ConfigOption<Boolean> UTC_TIMEZONE = 
key("utc-timezone")
                        .booleanType()
                        .defaultValue(false)
                        .withDescription("Use UTC timezone or local timezone to 
the conversion between epoch" +
                                        " time and LocalDateTime. Hive 
0.x/1.x/2.x use local timezone. But Hive 3.x" +
                                        " use UTC timezone");
 
-       /**
-        * Prefix for parquet-related properties, besides format, start with 
"parquet".
-        * See more in {@link ParquetOutputFormat}.
-        * - parquet.compression
-        * - parquet.block.size
-        * - parquet.page.size
-        * - parquet.dictionary.page.size
-        * - parquet.writer.max-padding
-        * - parquet.enable.dictionary
-        * - parquet.validation
-        * - parquet.writer.version
-        * ...
-        */
-       public static final String PARQUET_PROPERTIES = "format.parquet";
-
        @Override
-       public Map<String, String> requiredContext() {
-               Map<String, String> context = new HashMap<>();
-               context.put(FORMAT, "parquet");
-               return context;
+       public String factoryIdentifier() {
+               return IDENTIFIER;
        }
 
        @Override
-       public List<String> supportedProperties() {
-               return Arrays.asList(
-                               UTC_TIMEZONE.key(),
-                               PARQUET_PROPERTIES + ".*"
-               );
+       public Set<ConfigOption<?>> requiredOptions() {
+               return new HashSet<>();
        }
 
-       private static boolean isUtcTimestamp(DescriptorProperties properties) {
-               return properties.getOptionalBoolean(UTC_TIMEZONE.key())
-                               .orElse(UTC_TIMEZONE.defaultValue());
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(UTC_TIMEZONE);
+               // support "parquet.*"

Review comment:
       As far as I know, it is hard... Because we don't know how many options 
are supported by parquet and orc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to