This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 80bd169 [FLINK-20703][table] HiveSinkCompactionITCase test timeout
80bd169 is described below
commit 80bd16917f42a9cd31189fd7473ac803a4738363
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Dec 22 14:43:05 2020 +0800
[FLINK-20703][table] HiveSinkCompactionITCase test timeout
This closes #14453
---
.../runtime/stream/sql/CompactionITCaseBase.java | 10 +++++++++
.../filesystem/stream/compact/CompactOperator.java | 26 ++++++++++++++++------
2 files changed, 29 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java
index caa3a15..cd95a2f 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/CompactionITCaseBase.java
@@ -90,7 +90,17 @@ public abstract class CompactionITCaseBase extends
StreamingTestBase {
protected abstract void createPartitionTable(String path);
@Test
+ public void testSingleParallelism() throws Exception {
+ innerTestNonPartition(1);
+ }
+
+ @Test
public void testNonPartition() throws Exception {
+ innerTestNonPartition(3);
+ }
+
+ public void innerTestNonPartition(int parallelism) throws Exception {
+ env().setParallelism(parallelism);
createTable(resultPath);
tEnv().executeSql("insert into sink_table select * from
my_table").await();
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactOperator.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactOperator.java
index 5351e09..9e1a056 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactOperator.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactOperator.java
@@ -205,10 +205,13 @@ public class CompactOperator<T> extends
AbstractStreamOperator<PartitionCommitIn
long startMillis = System.currentTimeMillis();
+ boolean success = false;
if (paths.size() == 1) {
- // optimizer for single file, we just want an atomic
rename
- doAtomicRename(paths.get(0), target);
- } else {
+ // optimizer for single file
+ success = doSingleFileMove(paths.get(0), target);
+ }
+
+ if (!success) {
doMultiFilesCompact(partition, paths, target);
}
@@ -217,10 +220,18 @@ public class CompactOperator<T> extends
AbstractStreamOperator<PartitionCommitIn
costSeconds, target, paths);
}
- private void doAtomicRename(Path src, Path dst) throws IOException {
- // optimizer for Object Store (For example: S3 Filesystem).
- // just copy bytes
- RecoverableWriter writer = fileSystem.createRecoverableWriter();
+ private boolean doSingleFileMove(Path src, Path dst) throws IOException
{
+ // We can not rename, because we need to keep original file for
failover
+ RecoverableWriter writer;
+ try {
+ writer = fileSystem.createRecoverableWriter();
+ } catch (UnsupportedOperationException ignore) {
+ // Some writer not support RecoverableWriter, so
fallback to per record moving.
+ // For example, see the constructor of
HadoopRecoverableWriter. Although it not support
+ // RecoverableWriter, but
HadoopPathBasedBulkFormatBuilder can support streaming writing.
+ return false;
+ }
+
RecoverableFsDataOutputStream out = writer.open(dst);
try (FSDataInputStream in = fileSystem.open(src)) {
IOUtils.copyBytes(in, out, false);
@@ -229,6 +240,7 @@ public class CompactOperator<T> extends
AbstractStreamOperator<PartitionCommitIn
throw t;
}
out.closeForCommit().commit();
+ return true;
}
private void doMultiFilesCompact(String partition, List<Path> files,
Path dst) throws IOException {