luoyuxia commented on code in PR #20394:
URL: https://github.com/apache/flink/pull/20394#discussion_r935043184


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java:
##########
@@ -57,25 +57,31 @@ class FileSystemCommitter implements Serializable {
     private final boolean overwrite;
     private final Path tmpPath;
     private final int partitionColumnSize;
+    private final LinkedHashMap<String, String> staticPartitions;
 
     FileSystemCommitter(
             FileSystemFactory factory,
             TableMetaStoreFactory metaStoreFactory,
             boolean overwrite,
             Path tmpPath,
-            int partitionColumnSize) {
+            int partitionColumnSize,
+            LinkedHashMap<String, String> staticPartitions) {
         this.factory = factory;
         this.metaStoreFactory = metaStoreFactory;
         this.overwrite = overwrite;
         this.tmpPath = tmpPath;
         this.partitionColumnSize = partitionColumnSize;
+        this.staticPartitions = staticPartitions;
     }
 
     /** For committing job's output after successful batch job completion. */
     public void commitPartitions() throws Exception {
         FileSystem fs = factory.create(tmpPath.toUri());
         List<Path> taskPaths = listTaskTemporaryPaths(fs, tmpPath);
-
+        if (taskPaths.isEmpty() && !staticPartitions.isEmpty()) {

Review Comment:
   Could you please move such logic to the lines since no data in a partition 
is just a special case for it. 
   ```java
   if (partitionColumnSize > 0) {
   xxx
   }
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -78,6 +78,31 @@ public void loadNonPartition(List<Path> srcDirs) throws 
Exception {
         overwriteAndRenameFiles(srcDirs, tableLocation);
     }
 
+    /**
+     * The flink job does not generate data, but the corresponding partition 
needs to be created.
+     *
+     * <p>The partition does not exist, create it.
+     *
+     * <p>The partition exists:
+     *
+     * <pre>
+     *      if overwrite is true, delete the path, then create it;
+     *      if overwrite is false, do nothing;
+     * </pre>
+     */
+    public void loadEmptyPartition(LinkedHashMap<String, String> partSpec) 
throws Exception {
+        Optional<Path> pathFromMeta = metaStore.getPartition(partSpec);
+        if (pathFromMeta.isPresent() && !overwrite) {
+            return;
+        }
+        Path path = new Path(metaStore.getLocationPath(), 
generatePartitionPath(partSpec));
+        if (pathFromMeta.isPresent() && overwrite) {

Review Comment:
   `overwrite` in here is redundant?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java:
##########
@@ -78,6 +78,31 @@ public void loadNonPartition(List<Path> srcDirs) throws 
Exception {
         overwriteAndRenameFiles(srcDirs, tableLocation);
     }
 
+    /**
+     * The flink job does not generate data, but the corresponding partition 
needs to be created.

Review Comment:
   ```suggestion
        * The flink job does not write data to the partition, but the 
corresponding partition needs to be created.
   ```



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -61,7 +61,12 @@ private void createFile(java.nio.file.Path parent, String 
path, String... files)
     void testPartition() throws Exception {

Review Comment:
   Also add tests for committing empty partition in here.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java:
##########
@@ -940,6 +940,100 @@ public void testUnsupportedOperation() {
         }
     }
 
+    @Test
+    public void testInsertIntoNotExistStaticPartitionWithoutData() throws 
Exception {

Review Comment:
   I think these tests can move to `HiveTableSinkITCase` and can be composed to 
one single test.
   



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java:
##########
@@ -104,12 +104,14 @@ public void testOverwriteWithEmptySource() throws 
Exception {
                             tableEnv.executeSql("select * from destp order by 
x").collect());
             assertThat(results.toString()).isEqualTo("[+I[1, 1], +I[2, 2]]");
             // static partitioned table
+            // see FLINK-28720, The semantics of overwrite is to overwrite the 
original data, so the

Review Comment:
   I think  `see FLINK-28720,` can be removed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to