leonardBang commented on a change in pull request #12212:
URL: https://github.com/apache/flink/pull/12212#discussion_r426454025



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -29,6 +29,35 @@
  */
 public class FileSystemOptions {
 
+       public static final ConfigOption<String> PATH = key("path")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("The path of a directory");
+
+       public static final ConfigOption<String> PARTITION_DEFAULT_NAME = 
key("partition.default-name")
+                       .stringType()
+                       .defaultValue("__DEFAULT_PARTITION__")
+                       .withDescription("The default partition name in case 
the dynamic partition" +
+                                       " column value is null/empty string");
+
+       public static final ConfigOption<Long> SINK_ROLLING_POLICY_FILE_SIZE = 
key("sink.rolling-policy.file-size")
+                       .longType()
+                       .defaultValue(1024L * 1024L * 128L)
+                       .withDescription("The maximum part file size before 
rolling (by default 128MB).");
+
+       public static final ConfigOption<Long> 
SINK_ROLLING_POLICY_TIME_INTERVAL = key("sink.rolling-policy.time.interval")

Review comment:
       ```suggestion
        public static final ConfigOption<Long> 
SINK_ROLLING_POLICY_TIME_INTERVAL = key("sink.rolling-policy.time-interval")
   ```
   how about rename to `sink.rolling-policy.time-interval` which is closer to  
FLIP-122's style ?

##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
##########
@@ -23,137 +23,104 @@
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
 import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
 import org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader;
 import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.FileSystemFormatFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.utils.PartitionPathUtils;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.parquet.hadoop.ParquetOutputFormat;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static 
org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
-import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
 import static 
org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
 
 /**
  * Parquet {@link FileSystemFormatFactory} for file system.
  */
 public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory 
{
 
-       public static final ConfigOption<Boolean> UTC_TIMEZONE = 
key("format.utc-timezone")
+       public static final String IDENTIFIER = "parquet";
+
+       public static final ConfigOption<Boolean> UTC_TIMEZONE = 
key("utc-timezone")
                        .booleanType()
                        .defaultValue(false)
                        .withDescription("Use UTC timezone or local timezone to 
the conversion between epoch" +
                                        " time and LocalDateTime. Hive 
0.x/1.x/2.x use local timezone. But Hive 3.x" +
                                        " use UTC timezone");
 
-       /**
-        * Prefix for parquet-related properties, besides format, start with 
"parquet".
-        * See more in {@link ParquetOutputFormat}.
-        * - parquet.compression
-        * - parquet.block.size
-        * - parquet.page.size
-        * - parquet.dictionary.page.size
-        * - parquet.writer.max-padding
-        * - parquet.enable.dictionary
-        * - parquet.validation
-        * - parquet.writer.version
-        * ...
-        */
-       public static final String PARQUET_PROPERTIES = "format.parquet";
-
        @Override
-       public Map<String, String> requiredContext() {
-               Map<String, String> context = new HashMap<>();
-               context.put(FORMAT, "parquet");
-               return context;
+       public String factoryIdentifier() {
+               return IDENTIFIER;
        }
 
        @Override
-       public List<String> supportedProperties() {
-               return Arrays.asList(
-                               UTC_TIMEZONE.key(),
-                               PARQUET_PROPERTIES + ".*"
-               );
+       public Set<ConfigOption<?>> requiredOptions() {
+               return new HashSet<>();
        }
 
-       private static boolean isUtcTimestamp(DescriptorProperties properties) {
-               return properties.getOptionalBoolean(UTC_TIMEZONE.key())
-                               .orElse(UTC_TIMEZONE.defaultValue());
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(UTC_TIMEZONE);
+               // support "parquet.*"

Review comment:
       could we list all supported Options here? if yes please add for orc too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to