This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fa7623e99 [flink] Introduce bypass append compact coordinator and
worker (#3936)
fa7623e99 is described below
commit fa7623e992509d84f5c05b61327fbc5d16c6d999
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 13 13:08:23 2024 +0800
[flink] Introduce bypass append compact coordinator and worker (#3936)
---
.../flink/UnawareBucketAppendOnlyTableITCase.java | 88 ++++++++++++++++++++++
.../src/test/resources/log4j2-test.properties | 28 +++++++
paimon-flink/paimon-flink-1.19/pom.xml | 51 +++++++++++++
.../flink/UnawareBucketAppendOnlyTableITCase.java | 88 ++++++++++++++++++++++
.../src/test/resources/log4j2-test.properties | 28 +++++++
.../UnawareBucketCompactionTopoBuilder.java | 17 +----
.../sink/AppendBypassCompactWorkerOperator.java | 49 ++++++++++++
...rator.java => AppendCompactWorkerOperator.java} | 21 ++----
...endOnlySingleTableCompactionWorkerOperator.java | 72 +-----------------
.../paimon/flink/sink/UnawareBucketSink.java | 22 ++++--
.../source/AppendBypassCoordinateOperator.java | 88 ++++++++++++++++++++++
.../flink/source/BucketUnawareCompactSource.java | 10 +--
.../flink/UnawareBucketAppendOnlyTableITCase.java | 2 +
13 files changed, 450 insertions(+), 114 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
new file mode 100644
index 000000000..b8cec3fda
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.Snapshot;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for append-only managed unaware-bucket table. */
+public class UnawareBucketAppendOnlyTableITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING)
WITH ('bucket' = '-1')");
+ }
+
+ @Test
+ public void testCompactionInStreamingMode() throws Exception {
+ batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' =
'2')");
+ batchSql("ALTER TABLE append_table SET
('compaction.early-max.file-num' = '4')");
+ batchSql("ALTER TABLE append_table SET
('continuous.discovery-interval' = '1 s')");
+
+ sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(500));
+ sEnv.executeSql(
+ "CREATE TEMPORARY TABLE Orders_in (\n"
+ + " f0 INT,\n"
+ + " f1 STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'rows-per-second' = '1',\n"
+ + " 'number-of-rows' = '10'\n"
+ + ")");
+
+ assertStreamingHasCompact("INSERT INTO append_table SELECT * FROM
Orders_in", 60000);
+ // ensure data gen finished
+ Thread.sleep(5000);
+
+ List<Row> rows = batchSql("SELECT * FROM append_table");
+ assertThat(rows.size()).isEqualTo(10);
+ }
+
+ private void assertStreamingHasCompact(String sql, long timeout) throws
Exception {
+ long start = System.currentTimeMillis();
+ long currentId = 1;
+ sEnv.executeSql(sql);
+ Snapshot snapshot;
+ while (true) {
+ snapshot = findSnapshot("append_table", currentId);
+ if (snapshot != null) {
+ if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ break;
+ }
+ currentId++;
+ }
+ long now = System.currentTimeMillis();
+ if (now - start > timeout) {
+ throw new RuntimeException(
+ "Time up for streaming execute, don't get expected
result.");
+ }
+ Thread.sleep(1000);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-1.18/src/test/resources/log4j2-test.properties
b/paimon-flink/paimon-flink-1.18/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..1b3980d15
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/paimon-flink/paimon-flink-1.19/pom.xml
b/paimon-flink/paimon-flink-1.19/pom.xml
index 301cb0a10..afbf72c75 100644
--- a/paimon-flink/paimon-flink-1.19/pom.xml
+++ b/paimon-flink/paimon-flink-1.19/pom.xml
@@ -42,6 +42,12 @@ under the License.
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-common</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -55,6 +61,51 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.21</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
new file mode 100644
index 000000000..b8cec3fda
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.Snapshot;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for append-only managed unaware-bucket table. */
+public class UnawareBucketAppendOnlyTableITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING)
WITH ('bucket' = '-1')");
+ }
+
+ @Test
+ public void testCompactionInStreamingMode() throws Exception {
+ batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' =
'2')");
+ batchSql("ALTER TABLE append_table SET
('compaction.early-max.file-num' = '4')");
+ batchSql("ALTER TABLE append_table SET
('continuous.discovery-interval' = '1 s')");
+
+ sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(500));
+ sEnv.executeSql(
+ "CREATE TEMPORARY TABLE Orders_in (\n"
+ + " f0 INT,\n"
+ + " f1 STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'rows-per-second' = '1',\n"
+ + " 'number-of-rows' = '10'\n"
+ + ")");
+
+ assertStreamingHasCompact("INSERT INTO append_table SELECT * FROM
Orders_in", 60000);
+ // ensure data gen finished
+ Thread.sleep(5000);
+
+ List<Row> rows = batchSql("SELECT * FROM append_table");
+ assertThat(rows.size()).isEqualTo(10);
+ }
+
+ private void assertStreamingHasCompact(String sql, long timeout) throws
Exception {
+ long start = System.currentTimeMillis();
+ long currentId = 1;
+ sEnv.executeSql(sql);
+ Snapshot snapshot;
+ while (true) {
+ snapshot = findSnapshot("append_table", currentId);
+ if (snapshot != null) {
+ if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ break;
+ }
+ currentId++;
+ }
+ long now = System.currentTimeMillis();
+ if (now - start > timeout) {
+ throw new RuntimeException(
+ "Time up for streaming execute, don't get expected
result.");
+ }
+ Thread.sleep(1000);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-1.19/src/test/resources/log4j2-test.properties
b/paimon-flink/paimon-flink-1.19/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..1b3980d15
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.19/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
index ee44a72d4..b4d61c265 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.compact;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
import org.apache.paimon.options.Options;
@@ -71,27 +70,17 @@ public class UnawareBucketCompactionTopoBuilder {
public void build() {
// build source from UnawareSourceFunction
- DataStreamSource<AppendOnlyCompactionTask> source = buildSource(false);
+ DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
// from source, construct the full flink job
sinkFromSource(source);
}
- public DataStream<Committable> fetchUncommitted(String commitUser) {
- DataStreamSource<AppendOnlyCompactionTask> source = buildSource(true);
-
- // rebalance input to default or assigned parallelism
- DataStream<AppendOnlyCompactionTask> rebalanced =
rebalanceInput(source);
-
- return new UnawareBucketCompactionSink(table)
- .doWrite(rebalanced, commitUser, rebalanced.getParallelism());
- }
-
- private DataStreamSource<AppendOnlyCompactionTask> buildSource(boolean
emitMaxWatermark) {
+ private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
long scanInterval =
table.coreOptions().continuousDiscoveryInterval().toMillis();
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
- table, isContinuous, scanInterval, partitionPredicate,
emitMaxWatermark);
+ table, isContinuous, scanInterval, partitionPredicate);
return BucketUnawareCompactSource.buildSource(env, source,
isContinuous, tableIdentifier);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java
new file mode 100644
index 000000000..6bd45fea7
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+/** A {@link AppendCompactWorkerOperator} to bypass Committable inputs. */
+public class AppendBypassCompactWorkerOperator
+ extends AppendCompactWorkerOperator<Either<Committable,
AppendOnlyCompactionTask>> {
+
+ public AppendBypassCompactWorkerOperator(FileStoreTable table, String
commitUser) {
+ super(table, commitUser);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ }
+
+ @Override
+ public void processElement(StreamRecord<Either<Committable,
AppendOnlyCompactionTask>> element)
+ throws Exception {
+ if (element.getValue().isLeft()) {
+ output.collect(new StreamRecord<>(element.getValue().left()));
+ } else {
+ unawareBucketCompactor.processElement(element.getValue().right());
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
similarity index 79%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
index 8b41d7759..cd0708f79 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
@@ -27,7 +27,6 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,23 +38,22 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
- * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link
- * BucketUnawareCompactSource} for compacting single unaware bucket tables in
divided mode.
+ * An abstract Operator to execute {@link AppendOnlyCompactionTask} passed
from {@link
+ * BucketUnawareCompactSource} for compacting table. This operator is always
in async mode.
*/
-public class AppendOnlySingleTableCompactionWorkerOperator
- extends PrepareCommitOperator<AppendOnlyCompactionTask, Committable> {
+public abstract class AppendCompactWorkerOperator<IN>
+ extends PrepareCommitOperator<IN, Committable> {
- private static final Logger LOG =
-
LoggerFactory.getLogger(AppendOnlySingleTableCompactionWorkerOperator.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(AppendCompactWorkerOperator.class);
private final FileStoreTable table;
private final String commitUser;
- private transient UnawareBucketCompactor unawareBucketCompactor;
+ protected transient UnawareBucketCompactor unawareBucketCompactor;
private transient ExecutorService lazyCompactExecutor;
- public AppendOnlySingleTableCompactionWorkerOperator(FileStoreTable table,
String commitUser) {
+ public AppendCompactWorkerOperator(FileStoreTable table, String
commitUser) {
super(Options.fromMap(table.options()));
this.table = table;
this.commitUser = commitUser;
@@ -79,11 +77,6 @@ public class AppendOnlySingleTableCompactionWorkerOperator
return this.unawareBucketCompactor.prepareCommit(waitCompaction,
checkpointId);
}
- @Override
- public void processElement(StreamRecord<AppendOnlyCompactionTask> element)
throws Exception {
- this.unawareBucketCompactor.processElement(element.getValue());
- }
-
private ExecutorService workerExecutor() {
if (lazyCompactExecutor == null) {
lazyCompactExecutor =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
index 8b41d7759..b43af93e8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
@@ -18,93 +18,25 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.AppendOnlyCompactionTask;
-import org.apache.paimon.flink.compact.UnawareBucketCompactor;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
-import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
/**
* Operator to execute {@link AppendOnlyCompactionTask} passed from {@link
* BucketUnawareCompactSource} for compacting single unaware bucket tables in
divided mode.
*/
public class AppendOnlySingleTableCompactionWorkerOperator
- extends PrepareCommitOperator<AppendOnlyCompactionTask, Committable> {
-
- private static final Logger LOG =
-
LoggerFactory.getLogger(AppendOnlySingleTableCompactionWorkerOperator.class);
-
- private final FileStoreTable table;
- private final String commitUser;
-
- private transient UnawareBucketCompactor unawareBucketCompactor;
-
- private transient ExecutorService lazyCompactExecutor;
+ extends AppendCompactWorkerOperator<AppendOnlyCompactionTask> {
public AppendOnlySingleTableCompactionWorkerOperator(FileStoreTable table,
String commitUser) {
- super(Options.fromMap(table.options()));
- this.table = table;
- this.commitUser = commitUser;
- }
-
- @VisibleForTesting
- Iterable<Future<CommitMessage>> result() {
- return unawareBucketCompactor.result();
- }
-
- @Override
- public void open() throws Exception {
- LOG.debug("Opened a append-only table compaction worker.");
- this.unawareBucketCompactor =
- new UnawareBucketCompactor(table, commitUser,
this::workerExecutor);
- }
-
- @Override
- protected List<Committable> prepareCommit(boolean waitCompaction, long
checkpointId)
- throws IOException {
- return this.unawareBucketCompactor.prepareCommit(waitCompaction,
checkpointId);
+ super(table, commitUser);
}
@Override
public void processElement(StreamRecord<AppendOnlyCompactionTask> element)
throws Exception {
this.unawareBucketCompactor.processElement(element.getValue());
}
-
- private ExecutorService workerExecutor() {
- if (lazyCompactExecutor == null) {
- lazyCompactExecutor =
- Executors.newSingleThreadScheduledExecutor(
- new ExecutorThreadFactory(
- Thread.currentThread().getName()
- + "-append-only-compact-worker"));
- }
- return lazyCompactExecutor;
- }
-
- @Override
- public void close() throws Exception {
- if (lazyCompactExecutor != null) {
- // ignore runnable tasks in queue
- lazyCompactExecutor.shutdownNow();
- if (!lazyCompactExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
- LOG.warn(
- "Executors shutdown timeout, there may be some files
aren't deleted correctly");
- }
- this.unawareBucketCompactor.close();
- }
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
index ed4bca7b2..c4ac99906 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
@@ -18,10 +18,11 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
+import org.apache.paimon.flink.source.AppendBypassCoordinateOperator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -69,12 +70,19 @@ public abstract class UnawareBucketSink<T> extends
FlinkWriteSink<T> {
== RuntimeExecutionMode.STREAMING;
// if enable compaction, we need to add compaction topology to this job
if (enableCompaction && isStreamingMode && !boundedInput) {
- // if streaming mode with bounded input, we disable compaction
topology
- UnawareBucketCompactionTopoBuilder builder =
- new UnawareBucketCompactionTopoBuilder(
- input.getExecutionEnvironment(), table.name(),
table);
- builder.withContinuousMode(true);
- written =
written.union(builder.fetchUncommitted(initialCommitUser));
+ written =
+ written.transform(
+ "Compact Coordinator: " + table.name(),
+ new EitherTypeInfo<>(
+ new CommittableTypeInfo(),
+ new CompactionTaskTypeInfo()),
+ new
AppendBypassCoordinateOperator<>(table))
+ .forceNonParallel()
+ .transform(
+ "Compact Worker: " + table.name(),
+ new CommittableTypeInfo(),
+ new
AppendBypassCompactWorkerOperator(table, initialCommitUser))
+ .setParallelism(written.getParallelism());
}
return written;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
new file mode 100644
index 000000000..e07f27d10
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A {@link OneInputStreamOperator} to accept commit messages and send append
compact coordinate
+ * compact task to downstream operators.
+ */
+public class AppendBypassCoordinateOperator<CommitT>
+ extends AbstractStreamOperator<Either<CommitT,
AppendOnlyCompactionTask>>
+ implements OneInputStreamOperator<CommitT, Either<CommitT,
AppendOnlyCompactionTask>>,
+ ProcessingTimeService.ProcessingTimeCallback {
+
+ private final FileStoreTable table;
+
+ private transient long intervalMs;
+ private transient AppendOnlyTableCompactionCoordinator coordinator;
+
+ public AppendBypassCoordinateOperator(FileStoreTable table) {
+ this.table = table;
+ this.chainingStrategy = ChainingStrategy.NEVER;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ checkArgument(
+ getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+ "Compaction Coordinator parallelism in paimon MUST be one.");
+ this.coordinator = new AppendOnlyTableCompactionCoordinator(table,
true, null);
+ this.intervalMs =
table.coreOptions().continuousDiscoveryInterval().toMillis();
+
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ getProcessingTimeService().registerTimer(now, this);
+ }
+
+ @Override
+ public void onProcessingTime(long time) {
+ while (true) {
+ List<AppendOnlyCompactionTask> tasks = coordinator.run();
+ for (AppendOnlyCompactionTask task : tasks) {
+ output.collect(new StreamRecord<>(Either.Right(task)));
+ }
+
+ if (tasks.isEmpty()) {
+ break;
+ }
+ }
+
+ getProcessingTimeService().registerTimer(time + this.intervalMs, this);
+ }
+
+ @Override
+ public void processElement(StreamRecord<CommitT> record) throws Exception {
+ output.collect(new StreamRecord<>(Either.Left(record.getValue())));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index 936b2cd2c..7926fa60a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -32,7 +32,6 @@ import
org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,19 +60,16 @@ public class BucketUnawareCompactSource extends
RichSourceFunction<AppendOnlyCom
private transient AppendOnlyTableCompactionCoordinator
compactionCoordinator;
private transient SourceContext<AppendOnlyCompactionTask> ctx;
private volatile boolean isRunning = true;
- private final boolean emitMaxWatermark;
public BucketUnawareCompactSource(
FileStoreTable table,
boolean isStreaming,
long scanInterval,
- @Nullable Predicate filter,
- boolean emitMaxWatermark) {
+ @Nullable Predicate filter) {
this.table = table;
this.streaming = isStreaming;
this.scanInterval = scanInterval;
this.filter = filter;
- this.emitMaxWatermark = emitMaxWatermark;
}
@Override
@@ -98,10 +94,6 @@ public class BucketUnawareCompactSource extends
RichSourceFunction<AppendOnlyCom
List<AppendOnlyCompactionTask> tasks =
compactionCoordinator.run();
isEmpty = tasks.isEmpty();
tasks.forEach(ctx::collect);
-
- if (emitMaxWatermark) {
- ctx.emitWatermark(Watermark.MAX_WATERMARK);
- }
} catch (EndOfScanException esf) {
LOG.info("Catching EndOfStreamException, the stream is
finished.");
return;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 55bd89b01..5766510f7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -189,6 +189,7 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
public void testCompactionInStreamingMode() throws Exception {
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' =
'2')");
batchSql("ALTER TABLE append_table SET
('compaction.early-max.file-num' = '4')");
+ batchSql("ALTER TABLE append_table SET
('continuous.discovery-interval' = '1 s')");
sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(500));
sEnv.executeSql(
@@ -213,6 +214,7 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
public void testCompactionInStreamingModeWithMaxWatermark() throws
Exception {
batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' =
'2')");
batchSql("ALTER TABLE append_table SET
('compaction.early-max.file-num' = '4')");
+ batchSql("ALTER TABLE append_table SET
('continuous.discovery-interval' = '1 s')");
sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(500));
sEnv.executeSql(