This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fa7623e99 [flink] Introduce bypass append compact coordinator and 
worker (#3936)
fa7623e99 is described below

commit fa7623e992509d84f5c05b61327fbc5d16c6d999
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 13 13:08:23 2024 +0800

    [flink] Introduce bypass append compact coordinator and worker (#3936)
---
 .../flink/UnawareBucketAppendOnlyTableITCase.java  | 88 ++++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      | 28 +++++++
 paimon-flink/paimon-flink-1.19/pom.xml             | 51 +++++++++++++
 .../flink/UnawareBucketAppendOnlyTableITCase.java  | 88 ++++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      | 28 +++++++
 .../UnawareBucketCompactionTopoBuilder.java        | 17 +----
 .../sink/AppendBypassCompactWorkerOperator.java    | 49 ++++++++++++
 ...rator.java => AppendCompactWorkerOperator.java} | 21 ++----
 ...endOnlySingleTableCompactionWorkerOperator.java | 72 +-----------------
 .../paimon/flink/sink/UnawareBucketSink.java       | 22 ++++--
 .../source/AppendBypassCoordinateOperator.java     | 88 ++++++++++++++++++++++
 .../flink/source/BucketUnawareCompactSource.java   | 10 +--
 .../flink/UnawareBucketAppendOnlyTableITCase.java  |  2 +
 13 files changed, 450 insertions(+), 114 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
new file mode 100644
index 000000000..b8cec3fda
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.Snapshot;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for append-only managed unaware-bucket table. */
+public class UnawareBucketAppendOnlyTableITCase extends CatalogITCaseBase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList(
+                "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) 
WITH ('bucket' = '-1')");
+    }
+
+    @Test
+    public void testCompactionInStreamingMode() throws Exception {
+        batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = 
'2')");
+        batchSql("ALTER TABLE append_table SET 
('compaction.early-max.file-num' = '4')");
+        batchSql("ALTER TABLE append_table SET 
('continuous.discovery-interval' = '1 s')");
+
+        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(500));
+        sEnv.executeSql(
+                "CREATE TEMPORARY TABLE Orders_in (\n"
+                        + "    f0        INT,\n"
+                        + "    f1        STRING\n"
+                        + ") WITH (\n"
+                        + "    'connector' = 'datagen',\n"
+                        + "    'rows-per-second' = '1',\n"
+                        + "    'number-of-rows' = '10'\n"
+                        + ")");
+
+        assertStreamingHasCompact("INSERT INTO append_table SELECT * FROM 
Orders_in", 60000);
+        // ensure data gen finished
+        Thread.sleep(5000);
+
+        List<Row> rows = batchSql("SELECT * FROM append_table");
+        assertThat(rows.size()).isEqualTo(10);
+    }
+
+    private void assertStreamingHasCompact(String sql, long timeout) throws 
Exception {
+        long start = System.currentTimeMillis();
+        long currentId = 1;
+        sEnv.executeSql(sql);
+        Snapshot snapshot;
+        while (true) {
+            snapshot = findSnapshot("append_table", currentId);
+            if (snapshot != null) {
+                if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                    break;
+                }
+                currentId++;
+            }
+            long now = System.currentTimeMillis();
+            if (now - start > timeout) {
+                throw new RuntimeException(
+                        "Time up for streaming execute, don't get expected 
result.");
+            }
+            Thread.sleep(1000);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/resources/log4j2-test.properties 
b/paimon-flink/paimon-flink-1.18/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..1b3980d15
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/paimon-flink/paimon-flink-1.19/pom.xml 
b/paimon-flink/paimon-flink-1.19/pom.xml
index 301cb0a10..afbf72c75 100644
--- a/paimon-flink/paimon-flink-1.19/pom.xml
+++ b/paimon-flink/paimon-flink-1.19/pom.xml
@@ -42,6 +42,12 @@ under the License.
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-flink-common</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -55,6 +61,51 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+            <version>1.21</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
new file mode 100644
index 000000000..b8cec3fda
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.Snapshot;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for append-only managed unaware-bucket table. */
+public class UnawareBucketAppendOnlyTableITCase extends CatalogITCaseBase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList(
+                "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) 
WITH ('bucket' = '-1')");
+    }
+
+    @Test
+    public void testCompactionInStreamingMode() throws Exception {
+        batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = 
'2')");
+        batchSql("ALTER TABLE append_table SET 
('compaction.early-max.file-num' = '4')");
+        batchSql("ALTER TABLE append_table SET 
('continuous.discovery-interval' = '1 s')");
+
+        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(500));
+        sEnv.executeSql(
+                "CREATE TEMPORARY TABLE Orders_in (\n"
+                        + "    f0        INT,\n"
+                        + "    f1        STRING\n"
+                        + ") WITH (\n"
+                        + "    'connector' = 'datagen',\n"
+                        + "    'rows-per-second' = '1',\n"
+                        + "    'number-of-rows' = '10'\n"
+                        + ")");
+
+        assertStreamingHasCompact("INSERT INTO append_table SELECT * FROM 
Orders_in", 60000);
+        // ensure data gen finished
+        Thread.sleep(5000);
+
+        List<Row> rows = batchSql("SELECT * FROM append_table");
+        assertThat(rows.size()).isEqualTo(10);
+    }
+
+    private void assertStreamingHasCompact(String sql, long timeout) throws 
Exception {
+        long start = System.currentTimeMillis();
+        long currentId = 1;
+        sEnv.executeSql(sql);
+        Snapshot snapshot;
+        while (true) {
+            snapshot = findSnapshot("append_table", currentId);
+            if (snapshot != null) {
+                if (snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                    break;
+                }
+                currentId++;
+            }
+            long now = System.currentTimeMillis();
+            if (now - start > timeout) {
+                throw new RuntimeException(
+                        "Time up for streaming execute, don't get expected 
result.");
+            }
+            Thread.sleep(1000);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.19/src/test/resources/log4j2-test.properties 
b/paimon-flink/paimon-flink-1.19/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..1b3980d15
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.19/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
index ee44a72d4..b4d61c265 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.compact;
 
 import org.apache.paimon.append.AppendOnlyCompactionTask;
 import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
 import org.apache.paimon.flink.source.BucketUnawareCompactSource;
 import org.apache.paimon.options.Options;
@@ -71,27 +70,17 @@ public class UnawareBucketCompactionTopoBuilder {
 
     public void build() {
         // build source from UnawareSourceFunction
-        DataStreamSource<AppendOnlyCompactionTask> source = buildSource(false);
+        DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
 
         // from source, construct the full flink job
         sinkFromSource(source);
     }
 
-    public DataStream<Committable> fetchUncommitted(String commitUser) {
-        DataStreamSource<AppendOnlyCompactionTask> source = buildSource(true);
-
-        // rebalance input to default or assigned parallelism
-        DataStream<AppendOnlyCompactionTask> rebalanced = 
rebalanceInput(source);
-
-        return new UnawareBucketCompactionSink(table)
-                .doWrite(rebalanced, commitUser, rebalanced.getParallelism());
-    }
-
-    private DataStreamSource<AppendOnlyCompactionTask> buildSource(boolean 
emitMaxWatermark) {
+    private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
         long scanInterval = 
table.coreOptions().continuousDiscoveryInterval().toMillis();
         BucketUnawareCompactSource source =
                 new BucketUnawareCompactSource(
-                        table, isContinuous, scanInterval, partitionPredicate, 
emitMaxWatermark);
+                        table, isContinuous, scanInterval, partitionPredicate);
 
         return BucketUnawareCompactSource.buildSource(env, source, 
isContinuous, tableIdentifier);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java
new file mode 100644
index 000000000..6bd45fea7
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendBypassCompactWorkerOperator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+/** A {@link AppendCompactWorkerOperator} to bypass Committable inputs. */
+public class AppendBypassCompactWorkerOperator
+        extends AppendCompactWorkerOperator<Either<Committable, 
AppendOnlyCompactionTask>> {
+
+    public AppendBypassCompactWorkerOperator(FileStoreTable table, String 
commitUser) {
+        super(table, commitUser);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+    }
+
+    @Override
+    public void processElement(StreamRecord<Either<Committable, 
AppendOnlyCompactionTask>> element)
+            throws Exception {
+        if (element.getValue().isLeft()) {
+            output.collect(new StreamRecord<>(element.getValue().left()));
+        } else {
+            unawareBucketCompactor.processElement(element.getValue().right());
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
similarity index 79%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
index 8b41d7759..cd0708f79 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java
@@ -27,7 +27,6 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.utils.ExecutorThreadFactory;
 
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,23 +38,22 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link
- * BucketUnawareCompactSource} for compacting single unaware bucket tables in 
divided mode.
+ * An abstract Operator to execute {@link AppendOnlyCompactionTask} passed 
from {@link
+ * BucketUnawareCompactSource} for compacting table. This operator is always 
in async mode.
  */
-public class AppendOnlySingleTableCompactionWorkerOperator
-        extends PrepareCommitOperator<AppendOnlyCompactionTask, Committable> {
+public abstract class AppendCompactWorkerOperator<IN>
+        extends PrepareCommitOperator<IN, Committable> {
 
-    private static final Logger LOG =
-            
LoggerFactory.getLogger(AppendOnlySingleTableCompactionWorkerOperator.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(AppendCompactWorkerOperator.class);
 
     private final FileStoreTable table;
     private final String commitUser;
 
-    private transient UnawareBucketCompactor unawareBucketCompactor;
+    protected transient UnawareBucketCompactor unawareBucketCompactor;
 
     private transient ExecutorService lazyCompactExecutor;
 
-    public AppendOnlySingleTableCompactionWorkerOperator(FileStoreTable table, 
String commitUser) {
+    public AppendCompactWorkerOperator(FileStoreTable table, String 
commitUser) {
         super(Options.fromMap(table.options()));
         this.table = table;
         this.commitUser = commitUser;
@@ -79,11 +77,6 @@ public class AppendOnlySingleTableCompactionWorkerOperator
         return this.unawareBucketCompactor.prepareCommit(waitCompaction, 
checkpointId);
     }
 
-    @Override
-    public void processElement(StreamRecord<AppendOnlyCompactionTask> element) 
throws Exception {
-        this.unawareBucketCompactor.processElement(element.getValue());
-    }
-
     private ExecutorService workerExecutor() {
         if (lazyCompactExecutor == null) {
             lazyCompactExecutor =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
index 8b41d7759..b43af93e8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java
@@ -18,93 +18,25 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.append.AppendOnlyCompactionTask;
-import org.apache.paimon.flink.compact.UnawareBucketCompactor;
 import org.apache.paimon.flink.source.BucketUnawareCompactSource;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.utils.ExecutorThreadFactory;
 
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link
  * BucketUnawareCompactSource} for compacting single unaware bucket tables in 
divided mode.
  */
 public class AppendOnlySingleTableCompactionWorkerOperator
-        extends PrepareCommitOperator<AppendOnlyCompactionTask, Committable> {
-
-    private static final Logger LOG =
-            
LoggerFactory.getLogger(AppendOnlySingleTableCompactionWorkerOperator.class);
-
-    private final FileStoreTable table;
-    private final String commitUser;
-
-    private transient UnawareBucketCompactor unawareBucketCompactor;
-
-    private transient ExecutorService lazyCompactExecutor;
+        extends AppendCompactWorkerOperator<AppendOnlyCompactionTask> {
 
     public AppendOnlySingleTableCompactionWorkerOperator(FileStoreTable table, 
String commitUser) {
-        super(Options.fromMap(table.options()));
-        this.table = table;
-        this.commitUser = commitUser;
-    }
-
-    @VisibleForTesting
-    Iterable<Future<CommitMessage>> result() {
-        return unawareBucketCompactor.result();
-    }
-
-    @Override
-    public void open() throws Exception {
-        LOG.debug("Opened a append-only table compaction worker.");
-        this.unawareBucketCompactor =
-                new UnawareBucketCompactor(table, commitUser, 
this::workerExecutor);
-    }
-
-    @Override
-    protected List<Committable> prepareCommit(boolean waitCompaction, long 
checkpointId)
-            throws IOException {
-        return this.unawareBucketCompactor.prepareCommit(waitCompaction, 
checkpointId);
+        super(table, commitUser);
     }
 
     @Override
     public void processElement(StreamRecord<AppendOnlyCompactionTask> element) 
throws Exception {
         this.unawareBucketCompactor.processElement(element.getValue());
     }
-
-    private ExecutorService workerExecutor() {
-        if (lazyCompactExecutor == null) {
-            lazyCompactExecutor =
-                    Executors.newSingleThreadScheduledExecutor(
-                            new ExecutorThreadFactory(
-                                    Thread.currentThread().getName()
-                                            + "-append-only-compact-worker"));
-        }
-        return lazyCompactExecutor;
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (lazyCompactExecutor != null) {
-            // ignore runnable tasks in queue
-            lazyCompactExecutor.shutdownNow();
-            if (!lazyCompactExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
-                LOG.warn(
-                        "Executors shutdown timeout, there may be some files 
aren't deleted correctly");
-            }
-            this.unawareBucketCompactor.close();
-        }
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
index ed4bca7b2..c4ac99906 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
@@ -18,10 +18,11 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
+import org.apache.paimon.flink.source.AppendBypassCoordinateOperator;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 
@@ -69,12 +70,19 @@ public abstract class UnawareBucketSink<T> extends 
FlinkWriteSink<T> {
                         == RuntimeExecutionMode.STREAMING;
         // if enable compaction, we need to add compaction topology to this job
         if (enableCompaction && isStreamingMode && !boundedInput) {
-            // if streaming mode with bounded input, we disable compaction 
topology
-            UnawareBucketCompactionTopoBuilder builder =
-                    new UnawareBucketCompactionTopoBuilder(
-                            input.getExecutionEnvironment(), table.name(), 
table);
-            builder.withContinuousMode(true);
-            written = 
written.union(builder.fetchUncommitted(initialCommitUser));
+            written =
+                    written.transform(
+                                    "Compact Coordinator: " + table.name(),
+                                    new EitherTypeInfo<>(
+                                            new CommittableTypeInfo(),
+                                            new CompactionTaskTypeInfo()),
+                                    new 
AppendBypassCoordinateOperator<>(table))
+                            .forceNonParallel()
+                            .transform(
+                                    "Compact Worker: " + table.name(),
+                                    new CommittableTypeInfo(),
+                                    new 
AppendBypassCompactWorkerOperator(table, initialCommitUser))
+                            .setParallelism(written.getParallelism());
         }
 
         return written;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
new file mode 100644
index 000000000..e07f27d10
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendBypassCoordinateOperator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A {@link OneInputStreamOperator} to accept commit messages and send append 
compact coordinate
+ * compact task to downstream operators.
+ */
+public class AppendBypassCoordinateOperator<CommitT>
+        extends AbstractStreamOperator<Either<CommitT, 
AppendOnlyCompactionTask>>
+        implements OneInputStreamOperator<CommitT, Either<CommitT, 
AppendOnlyCompactionTask>>,
+                ProcessingTimeService.ProcessingTimeCallback {
+
+    private final FileStoreTable table;
+
+    private transient long intervalMs;
+    private transient AppendOnlyTableCompactionCoordinator coordinator;
+
+    public AppendBypassCoordinateOperator(FileStoreTable table) {
+        this.table = table;
+        this.chainingStrategy = ChainingStrategy.NEVER;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        checkArgument(
+                getRuntimeContext().getNumberOfParallelSubtasks() == 1,
+                "Compaction Coordinator parallelism in paimon MUST be one.");
+        this.coordinator = new AppendOnlyTableCompactionCoordinator(table, 
true, null);
+        this.intervalMs = 
table.coreOptions().continuousDiscoveryInterval().toMillis();
+
+        long now = getProcessingTimeService().getCurrentProcessingTime();
+        getProcessingTimeService().registerTimer(now, this);
+    }
+
+    @Override
+    public void onProcessingTime(long time) {
+        while (true) {
+            List<AppendOnlyCompactionTask> tasks = coordinator.run();
+            for (AppendOnlyCompactionTask task : tasks) {
+                output.collect(new StreamRecord<>(Either.Right(task)));
+            }
+
+            if (tasks.isEmpty()) {
+                break;
+            }
+        }
+
+        getProcessingTimeService().registerTimer(time + this.intervalMs, this);
+    }
+
+    @Override
+    public void processElement(StreamRecord<CommitT> record) throws Exception {
+        output.collect(new StreamRecord<>(Either.Left(record.getValue())));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index 936b2cd2c..7926fa60a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,19 +60,16 @@ public class BucketUnawareCompactSource extends 
RichSourceFunction<AppendOnlyCom
     private transient AppendOnlyTableCompactionCoordinator 
compactionCoordinator;
     private transient SourceContext<AppendOnlyCompactionTask> ctx;
     private volatile boolean isRunning = true;
-    private final boolean emitMaxWatermark;
 
     public BucketUnawareCompactSource(
             FileStoreTable table,
             boolean isStreaming,
             long scanInterval,
-            @Nullable Predicate filter,
-            boolean emitMaxWatermark) {
+            @Nullable Predicate filter) {
         this.table = table;
         this.streaming = isStreaming;
         this.scanInterval = scanInterval;
         this.filter = filter;
-        this.emitMaxWatermark = emitMaxWatermark;
     }
 
     @Override
@@ -98,10 +94,6 @@ public class BucketUnawareCompactSource extends 
RichSourceFunction<AppendOnlyCom
                     List<AppendOnlyCompactionTask> tasks = 
compactionCoordinator.run();
                     isEmpty = tasks.isEmpty();
                     tasks.forEach(ctx::collect);
-
-                    if (emitMaxWatermark) {
-                        ctx.emitWatermark(Watermark.MAX_WATERMARK);
-                    }
                 } catch (EndOfScanException esf) {
                     LOG.info("Catching EndOfStreamException, the stream is 
finished.");
                     return;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 55bd89b01..5766510f7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -189,6 +189,7 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
     public void testCompactionInStreamingMode() throws Exception {
         batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = 
'2')");
         batchSql("ALTER TABLE append_table SET 
('compaction.early-max.file-num' = '4')");
+        batchSql("ALTER TABLE append_table SET 
('continuous.discovery-interval' = '1 s')");
 
         sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(500));
         sEnv.executeSql(
@@ -213,6 +214,7 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
     public void testCompactionInStreamingModeWithMaxWatermark() throws 
Exception {
         batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = 
'2')");
         batchSql("ALTER TABLE append_table SET 
('compaction.early-max.file-num' = '4')");
+        batchSql("ALTER TABLE append_table SET 
('continuous.discovery-interval' = '1 s')");
 
         sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(500));
         sEnv.executeSql(


Reply via email to