This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c7ad115a5 [flink] disable compaction when unaware-bucket table writes
in batch mode (#1455)
c7ad115a5 is described below
commit c7ad115a554dd79e8891e55018527434842da2e1
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jun 29 17:39:23 2023 +0800
[flink] disable compaction when unaware-bucket table writes in batch mode
(#1455)
---
.../paimon/flink/sink/UnawareBucketWriteSink.java | 15 ++---
.../org/apache/paimon/flink/CatalogITCaseBase.java | 8 +++
.../flink/UnawareBucketAppendOnlyTableITCase.java | 78 +++++++++++++++++-----
3 files changed, 77 insertions(+), 24 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
index 84e1a4818..ad10f96ba 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
@@ -57,18 +57,17 @@ public class UnawareBucketWriteSink extends FileStoreSink {
// do the actually writing action, no snapshot generated in this stage
DataStream<Committable> written = doWrite(input, initialCommitUser,
parallelism);
+ boolean isStreamingMode =
+ input.getExecutionEnvironment()
+ .getConfiguration()
+ .get(ExecutionOptions.RUNTIME_MODE)
+ == RuntimeExecutionMode.STREAMING;
// if enable compaction, we need to add compaction topology to this job
- if (enableCompaction) {
- boolean isStreamingMode =
- input.getExecutionEnvironment()
- .getConfiguration()
- .get(ExecutionOptions.RUNTIME_MODE)
- == RuntimeExecutionMode.STREAMING;
-
+ if (enableCompaction && isStreamingMode) {
UnawareBucketCompactionTopoBuilder builder =
new UnawareBucketCompactionTopoBuilder(
input.getExecutionEnvironment(), table.name(),
table);
- builder.withContinuousMode(isStreamingMode);
+ builder.withContinuousMode(true);
written =
written.union(builder.fetchUncommitted(initialCommitUser));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
index c15a6841a..d49afc681 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -173,6 +173,14 @@ public abstract class CatalogITCaseBase extends
AbstractTestBase {
return id == null ? null : snapshotManager.snapshot(id);
}
+ @Nullable
+ protected Snapshot findSnapshot(String tableName, long snapshotId) {
+ SnapshotManager snapshotManager =
+ new SnapshotManager(LocalFileIO.create(),
getTableDirectory(tableName));
+ Long id = snapshotManager.latestSnapshotId();
+ return id == null ? null : id >= snapshotId ?
snapshotManager.snapshot(snapshotId) : null;
+ }
+
protected String toWarehouse(String path) {
return path;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 915f66015..8e3840eca 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
import java.io.File;
+import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -36,6 +37,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -121,42 +123,42 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
}
@Test
- public void testAutoCompaction() {
+ public void testNoCompactionInBatchMode() {
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' =
'2')");
batchSql("ALTER TABLE append_table SET
('compaction.early-max.file-num' = '4')");
- assertAutoCompaction(
+ assertExecuteExpected(
"INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')",
1L,
Snapshot.CommitKind.APPEND);
- assertAutoCompaction(
+ assertExecuteExpected(
"INSERT INTO append_table VALUES (3, 'CCC'), (4, 'DDD')",
2L,
Snapshot.CommitKind.APPEND);
- assertAutoCompaction(
+ assertExecuteExpected(
"INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB'), (3,
'CCC'), (4, 'DDD')",
3L,
Snapshot.CommitKind.APPEND);
- assertAutoCompaction(
+ assertExecuteExpected(
"INSERT INTO append_table VALUES (5, 'EEE'), (6, 'FFF')",
4L,
Snapshot.CommitKind.APPEND);
- assertAutoCompaction(
+ assertExecuteExpected(
"INSERT INTO append_table VALUES (7, 'HHH'), (8, 'III')",
- 6L,
- Snapshot.CommitKind.COMPACT);
- assertAutoCompaction(
+ 5L,
+ Snapshot.CommitKind.APPEND);
+ assertExecuteExpected(
"INSERT INTO append_table VALUES (9, 'JJJ'), (10, 'KKK')",
- 7L,
+ 6L,
Snapshot.CommitKind.APPEND);
- assertAutoCompaction(
+ assertExecuteExpected(
"INSERT INTO append_table VALUES (11, 'LLL'), (12, 'MMM')",
- 8L,
+ 7L,
Snapshot.CommitKind.APPEND);
- assertAutoCompaction(
+ assertExecuteExpected(
"INSERT INTO append_table VALUES (13, 'NNN'), (14, 'OOO')",
- 10L,
- Snapshot.CommitKind.COMPACT);
+ 8L,
+ Snapshot.CommitKind.APPEND);
List<Row> rows = batchSql("SELECT * FROM append_table");
assertThat(rows.size()).isEqualTo(18);
@@ -182,6 +184,28 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
Row.of(14, "OOO"));
}
+ @Test
+ public void testCompactionInStreamingMode() throws Exception {
+ batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' =
'2')");
+ batchSql("ALTER TABLE append_table SET
('compaction.early-max.file-num' = '4')");
+
+ sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(500));
+ sEnv.executeSql(
+ "CREATE TEMPORARY TABLE Orders_in (\n"
+ + " f0 INT,\n"
+ + " f1 STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'rows-per-second' = '1',\n"
+ + " 'number-of-rows' = '4'\n"
+ + ")");
+
+ assertStreamingHasCompact("INSERT INTO append_table SELECT * FROM
Orders_in", 60000);
+
+ List<Row> rows = batchSql("SELECT * FROM append_table");
+ assertThat(rows.size()).isEqualTo(4);
+ }
+
@Test
public void testRejectDelete() {
testRejectChanges(RowKind.DELETE);
@@ -308,11 +332,33 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
.hasRootCauseMessage("Append only writer can not accept row
with RowKind %s", kind);
}
- private void assertAutoCompaction(
+ private void assertExecuteExpected(
String sql, long expectedSnapshotId, Snapshot.CommitKind
expectedCommitKind) {
batchSql(sql);
Snapshot snapshot = findLatestSnapshot("append_table");
assertThat(snapshot.id()).isEqualTo(expectedSnapshotId);
assertThat(snapshot.commitKind()).isEqualTo(expectedCommitKind);
}
+
+ private void assertStreamingHasCompact(String sql, long timeout) throws
Exception {
+ long start = System.currentTimeMillis();
+ long currentId = 1;
+ sEnv.executeSql(sql);
+ Snapshot snapshot;
+ while (true) {
+ snapshot = findSnapshot("append_table", currentId);
+ if (snapshot != null) {
+ if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ break;
+ }
+ currentId++;
+ }
+ long now = System.currentTimeMillis();
+ if (now - start > timeout) {
+ throw new RuntimeException(
+ "Time up for streaming execute, don't get expected
result.");
+ }
+ Thread.sleep(1000);
+ }
+ }
}