leonardBang commented on a change in pull request #11796:
URL: https://github.com/apache/flink/pull/11796#discussion_r414557669



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -75,14 +86,20 @@
         * @param formatProperties format properties.
         */
        public FileSystemTableSink(
+                       boolean isBounded,

Review comment:
       please add param note

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -75,14 +86,20 @@
         * @param formatProperties format properties.
         */
        public FileSystemTableSink(
+                       boolean isBounded,
                        TableSchema schema,
                        Path path,
                        List<String> partitionKeys,
                        String defaultPartName,
+                       long rollingFileSize,
+                       long rollingTimeInterval,
                        Map<String, String> formatProperties) {
+               this.isBounded = isBounded;
                this.schema = schema;
                this.path = path;
                this.defaultPartName = defaultPartName;
+               this.rollingFileSize = rollingFileSize;

Review comment:
       as above

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -75,14 +86,20 @@
         * @param formatProperties format properties.
         */
        public FileSystemTableSink(
+                       boolean isBounded,
                        TableSchema schema,
                        Path path,
                        List<String> partitionKeys,
                        String defaultPartName,
+                       long rollingFileSize,
+                       long rollingTimeInterval,
                        Map<String, String> formatProperties) {
+               this.isBounded = isBounded;
                this.schema = schema;
                this.path = path;
                this.defaultPartName = defaultPartName;
+               this.rollingFileSize = rollingFileSize;
+               this.rollingTimeInterval = rollingTimeInterval;

Review comment:
       as above

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -284,4 +337,125 @@ public boolean configurePartitionGrouping(boolean 
supportsGrouping) {
                this.dynamicGrouping = supportsGrouping;
                return dynamicGrouping;
        }
+
+       /**
+        * Table bucket assigner, wrap {@link PartitionComputer}.
+        */
+       private static class TableBucketAssigner implements 
BucketAssigner<BaseRow, String> {
+
+               private final PartitionComputer<BaseRow> computer;
+
+               private TableBucketAssigner(PartitionComputer<BaseRow> 
computer) {
+                       this.computer = computer;
+               }
+
+               @Override
+               public String getBucketId(BaseRow element, Context context) {
+                       try {
+                               return PartitionPathUtils.generatePartitionPath(
+                                               
computer.generatePartValues(element));
+                       } catch (Exception e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+
+               @Override
+               public SimpleVersionedSerializer<String> getSerializer() {
+                       return SimpleVersionedStringSerializer.INSTANCE;
+               }
+       }
+
+       /**
+        * Table {@link RollingPolicy}, now it is a {@link 
CheckpointRollingPolicy}.
+        * Because partition commit is hard to support false.

Review comment:
       This note looks like strange,  I cannot the meaning..

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionPathUtils.java
##########
@@ -88,7 +88,9 @@ public static String 
generatePartitionPath(LinkedHashMap<String, String> partiti
                        suffixBuf.append(escapePathName(e.getValue()));
                        i++;
                }
-               suffixBuf.append(Path.SEPARATOR);
+               if (partitionSpec.size() > 0) {

Review comment:
       we can move the judgement to the begin to make the code path short




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to