This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 80bd169  [FLINK-20703][table] HiveSinkCompactionITCase test timeout
80bd169 is described below

commit 80bd16917f42a9cd31189fd7473ac803a4738363
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Dec 22 14:43:05 2020 +0800

    [FLINK-20703][table] HiveSinkCompactionITCase test timeout
    
    This closes #14453
---
 .../runtime/stream/sql/CompactionITCaseBase.java   | 10 +++++++++
 .../filesystem/stream/compact/CompactOperator.java | 26 ++++++++++++++++------
 2 files changed, 29 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java
index caa3a15..cd95a2f 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java
@@ -90,7 +90,17 @@ public abstract class CompactionITCaseBase extends 
StreamingTestBase {
        protected abstract void createPartitionTable(String path);
 
        @Test
+       public void testSingleParallelism() throws Exception {
+               innerTestNonPartition(1);
+       }
+
+       @Test
        public void testNonPartition() throws Exception {
+               innerTestNonPartition(3);
+       }
+
+       public void innerTestNonPartition(int parallelism) throws Exception {
+               env().setParallelism(parallelism);
                createTable(resultPath);
                tEnv().executeSql("insert into sink_table select * from 
my_table").await();
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactOperator.java
index 5351e09..9e1a056 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactOperator.java
@@ -205,10 +205,13 @@ public class CompactOperator<T> extends 
AbstractStreamOperator<PartitionCommitIn
 
                long startMillis = System.currentTimeMillis();
 
+               boolean success = false;
                if (paths.size() == 1) {
-                       // optimizer for single file, we just want an atomic 
rename
-                       doAtomicRename(paths.get(0), target);
-               } else {
+                       // optimizer for single file
+                       success = doSingleFileMove(paths.get(0), target);
+               }
+
+               if (!success) {
                        doMultiFilesCompact(partition, paths, target);
                }
 
@@ -217,10 +220,18 @@ public class CompactOperator<T> extends 
AbstractStreamOperator<PartitionCommitIn
                                costSeconds, target, paths);
        }
 
-       private void doAtomicRename(Path src, Path dst) throws IOException {
-               // optimizer for Object Store (For example: S3 Filesystem).
-               // just copy bytes
-               RecoverableWriter writer = fileSystem.createRecoverableWriter();
+       private boolean doSingleFileMove(Path src, Path dst) throws IOException 
{
+               // We can not rename, because we need to keep original file for 
failover
+               RecoverableWriter writer;
+               try {
+                       writer = fileSystem.createRecoverableWriter();
+               } catch (UnsupportedOperationException ignore) {
+                       // Some writer not support RecoverableWriter, so 
fallback to per record moving.
+                       // For example, see the constructor of 
HadoopRecoverableWriter. Although it not support
+                       // RecoverableWriter, but 
HadoopPathBasedBulkFormatBuilder can support streaming writing.
+                       return false;
+               }
+
                RecoverableFsDataOutputStream out = writer.open(dst);
                try (FSDataInputStream in = fileSystem.open(src)) {
                        IOUtils.copyBytes(in, out, false);
@@ -229,6 +240,7 @@ public class CompactOperator<T> extends 
AbstractStreamOperator<PartitionCommitIn
                        throw t;
                }
                out.closeForCommit().commit();
+               return true;
        }
 
        private void doMultiFilesCompact(String partition, List<Path> files, 
Path dst) throws IOException {

Reply via email to