This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eadf1cf3 [flink][mysql-cdc] All FileStoreWrites generated in database 
synchronization job should use the same compaction executor service (#1727)
4eadf1cf3 is described below

commit 4eadf1cf3609e098d2a756c0cae089f6d7e0df76
Author: yuzelin <[email protected]>
AuthorDate: Fri Aug 4 12:04:00 2023 +0800

    [flink][mysql-cdc] All FileStoreWrites generated in database 
synchronization job should use the same compaction executor service (#1727)
---
 .../org/apache/paimon/utils/CommonTestUtils.java   |  5 ++
 .../paimon/operation/AbstractFileStoreWrite.java   | 13 ++-
 .../apache/paimon/table/sink/TableWriteImpl.java   |  6 ++
 .../paimon/flink/sink/StoreSinkWriteImpl.java      | 11 +++
 .../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 43 ++++++++--
 .../cdc/CdcRecordStoreMultiWriteOperatorTest.java  | 98 ++++++++++++++++++----
 6 files changed, 149 insertions(+), 27 deletions(-)

diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java 
b/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
index 54ddc4ffe..2461d8ba4 100644
--- a/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
@@ -80,6 +80,11 @@ public class CommonTestUtils {
         
assertThatThrownBy(code::call).isInstanceOf(expected).hasMessageContaining(msg);
     }
 
+    public static void waitUtil(Supplier<Boolean> condition, Duration timeout, 
Duration pause)
+            throws TimeoutException, InterruptedException {
+        waitUtil(condition, timeout, pause, "Failed to wait for condition to 
be true.");
+    }
+
     /**
      * Wait util the given condition is met or timeout.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 352e4b627..ad2fbb720 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -70,6 +70,7 @@ public abstract class AbstractFileStoreWrite<T>
     protected final Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers;
 
     private ExecutorService lazyCompactExecutor;
+    private boolean closeCompactExecutorWhenLeaving = true;
     private boolean ignorePreviousFiles = false;
     protected boolean isStreamingMode = false;
 
@@ -102,6 +103,11 @@ public abstract class AbstractFileStoreWrite<T>
         this.ignorePreviousFiles = ignorePreviousFiles;
     }
 
+    public void withCompactExecutor(ExecutorService compactExecutor) {
+        this.lazyCompactExecutor = compactExecutor;
+        this.closeCompactExecutorWhenLeaving = false;
+    }
+
     @Override
     public void write(BinaryRow partition, int bucket, T data) throws 
Exception {
         WriterContainer<T> container = getWriterWrapper(partition, bucket);
@@ -231,7 +237,7 @@ public abstract class AbstractFileStoreWrite<T>
             }
         }
         writers.clear();
-        if (lazyCompactExecutor != null) {
+        if (lazyCompactExecutor != null && closeCompactExecutorWhenLeaving) {
             lazyCompactExecutor.shutdownNow();
         }
     }
@@ -356,6 +362,11 @@ public abstract class AbstractFileStoreWrite<T>
         return lazyCompactExecutor;
     }
 
+    @VisibleForTesting
+    public ExecutorService getCompactExecutor() {
+        return lazyCompactExecutor;
+    }
+
     protected void notifyNewWriter(RecordWriter<T> writer) {}
 
     protected abstract RecordWriter<T> createWriter(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 2c96eac5a..d3fe25007 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -31,6 +31,7 @@ import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.utils.Restorable;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import static org.apache.paimon.utils.Preconditions.checkState;
 
@@ -86,6 +87,11 @@ public class TableWriteImpl<T>
         return this;
     }
 
+    public TableWriteImpl<T> withCompactExecutor(ExecutorService 
compactExecutor) {
+        write.withCompactExecutor(compactExecutor);
+        return this;
+    }
+
     @Override
     public BinaryRow getPartition(InternalRow row) {
         keyAndBucketExtractor.setRecord(row);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 1b6b4d53f..d1baa5e85 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManagerImpl;
@@ -40,6 +41,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -148,6 +150,10 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
         }
     }
 
+    public void withCompactExecutor(ExecutorService compactExecutor) {
+        write.withCompactExecutor(compactExecutor);
+    }
+
     @Override
     public SinkRecord write(InternalRow rowData) throws Exception {
         return write.writeAndReturn(rowData);
@@ -223,4 +229,9 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
         write = newTableWrite(newTable);
         write.restore((List) states);
     }
+
+    @VisibleForTesting
+    public TableWriteImpl<?> getWrite() {
+        return write;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 34292d0ef..50799b76f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
@@ -25,12 +26,14 @@ import org.apache.paimon.flink.sink.MultiTableCommittable;
 import org.apache.paimon.flink.sink.PrepareCommitOperator;
 import org.apache.paimon.flink.sink.StateUtils;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
 import org.apache.paimon.flink.sink.StoreSinkWriteState;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.ExecutorThreadFactory;
 
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -42,6 +45,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
@@ -61,12 +66,12 @@ public class CdcRecordStoreMultiWriteOperator
     private final Catalog.Loader catalogLoader;
 
     private MemoryPoolFactory memoryPoolFactory;
-
-    protected Catalog catalog;
-    protected Map<Identifier, FileStoreTable> tables;
-    protected StoreSinkWriteState state;
-    protected Map<Identifier, StoreSinkWrite> writes;
-    protected String commitUser;
+    private Catalog catalog;
+    private Map<Identifier, FileStoreTable> tables;
+    private StoreSinkWriteState state;
+    private Map<Identifier, StoreSinkWrite> writes;
+    private String commitUser;
+    private ExecutorService compactExecutor;
 
     public CdcRecordStoreMultiWriteOperator(
             Catalog.Loader catalogLoader,
@@ -96,6 +101,10 @@ public class CdcRecordStoreMultiWriteOperator
         state = new StoreSinkWriteState(context, (tableName, partition, 
bucket) -> true);
         tables = new HashMap<>();
         writes = new HashMap<>();
+        compactExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory(
+                                Thread.currentThread().getName() + 
"-CdcMultiWrite-Compaction"));
     }
 
     @Override
@@ -108,8 +117,6 @@ public class CdcRecordStoreMultiWriteOperator
 
         FileStoreTable table = getTable(tableId);
 
-        // TODO set executor service to write
-
         // all table write should share one write buffer so that writers can 
preempt memory
         // from those of other tables
         if (memoryPoolFactory == null) {
@@ -134,6 +141,8 @@ public class CdcRecordStoreMultiWriteOperator
                                         
getContainingTask().getEnvironment().getIOManager(),
                                         memoryPoolFactory));
 
+        ((StoreSinkWriteImpl) write).withCompactExecutor(compactExecutor);
+
         Optional<GenericRow> optionalConverted =
                 toGenericRow(record.record(), table.schema().fields());
         if (!optionalConverted.isPresent()) {
@@ -201,6 +210,9 @@ public class CdcRecordStoreMultiWriteOperator
         for (StoreSinkWrite write : writes.values()) {
             write.close();
         }
+        if (compactExecutor != null) {
+            compactExecutor.shutdownNow();
+        }
     }
 
     @Override
@@ -219,4 +231,19 @@ public class CdcRecordStoreMultiWriteOperator
         }
         return committables;
     }
+
+    @VisibleForTesting
+    public Map<Identifier, FileStoreTable> tables() {
+        return tables;
+    }
+
+    @VisibleForTesting
+    public Map<Identifier, StoreSinkWrite> writes() {
+        return writes;
+    }
+
+    @VisibleForTesting
+    public String commitUser() {
+        return commitUser;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 675c13da3..fd5046e8b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -24,8 +24,8 @@ import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.sink.MultiTableCommittable;
 import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
-import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
@@ -37,6 +37,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CommonTestUtils;
 import org.apache.paimon.utils.TraceableFileIO;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -51,15 +52,16 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 
-import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -147,16 +149,6 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
     }
 
-    private static FileIO getFileIO(CatalogContext catalogContext, Path 
warehouse) {
-        FileIO fileIO;
-        try {
-            fileIO = FileIO.get(warehouse, catalogContext);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-        return fileIO;
-    }
-
     @Test
     @Timeout(30)
     public void testAsyncTableCreate() throws Exception {
@@ -245,14 +237,14 @@ public class CdcRecordStoreMultiWriteOperatorTest {
 
         CdcRecordStoreMultiWriteOperator operator =
                 (CdcRecordStoreMultiWriteOperator) harness.getOperator();
-        assertThat(operator.tables.size()).isEqualTo(0);
-        assertThat(operator.writes.size()).isEqualTo(0);
+        assertThat(operator.tables().size()).isEqualTo(0);
+        assertThat(operator.writes().size()).isEqualTo(0);
 
         catalog.createTable(tableId, firstTableSchema, true);
         actual = runner.take();
         assertThat(actual).isEqualTo(expected);
-        assertThat(operator.tables.size()).isEqualTo(1);
-        assertThat(operator.writes.size()).isEqualTo(1);
+        assertThat(operator.tables().size()).isEqualTo(1);
+        assertThat(operator.writes().size()).isEqualTo(1);
 
         // after table is created, record should be processed immediately
         fields = new HashMap<>();
@@ -278,7 +270,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         harness.initializeState(snapshot);
         operator = (CdcRecordStoreMultiWriteOperator) harness.getOperator();
 
-        assertThat(operator.commitUser).isEqualTo(prevCommitUser);
+        assertThat(operator.commitUser()).isEqualTo(prevCommitUser);
 
         runner.stop();
         t.join();
@@ -640,6 +632,76 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         harness.close();
     }
 
+    @Test
+    @Timeout(30)
+    public void testUsingTheSameCompactExecutor() throws Exception {
+        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, 
MultiTableCommittable> harness =
+                createTestHarness(catalogLoader);
+        harness.open();
+
+        Runner runner = new Runner(harness);
+        Thread t = new Thread(runner);
+        t.start();
+
+        // write records to two tables thus two FileStoreWrite will be created
+        Map<String, String> fields;
+
+        // first table record
+        fields = new HashMap<>();
+        fields.put("pt", "0");
+        fields.put("k", "1");
+        fields.put("v", "10");
+
+        CdcMultiplexRecord expected =
+                CdcMultiplexRecord.fromCdcRecord(
+                        databaseName,
+                        firstTable.getObjectName(),
+                        new CdcRecord(RowKind.INSERT, fields));
+        runner.offer(expected);
+
+        // second table record
+        fields = new HashMap<>();
+        fields.put("k", "1");
+        fields.put("v1", "10");
+        fields.put("v2", "0.625");
+        fields.put("v3", "one");
+        fields.put("v4", "b_one");
+        expected =
+                CdcMultiplexRecord.fromCdcRecord(
+                        databaseName,
+                        secondTable.getObjectName(),
+                        new CdcRecord(RowKind.INSERT, fields));
+        runner.offer(expected);
+
+        // get and check compactExecutor from two FileStoreWrite
+        CdcRecordStoreMultiWriteOperator operator =
+                (CdcRecordStoreMultiWriteOperator) harness.getOperator();
+        CommonTestUtils.waitUtil(
+                () -> operator.writes().size() == 2, Duration.ofSeconds(5), 
Duration.ofMillis(100));
+
+        List<StoreSinkWrite> storeSinkWrites = new 
ArrayList<>(operator.writes().values());
+        List<ExecutorService> compactExecutors = new ArrayList<>();
+        for (StoreSinkWrite storeSinkWrite : storeSinkWrites) {
+            StoreSinkWriteImpl storeSinkWriteImpl = (StoreSinkWriteImpl) 
storeSinkWrite;
+            
compactExecutors.add(storeSinkWriteImpl.getWrite().getWrite().getCompactExecutor());
+        }
+        assertThat(compactExecutors.get(0) == 
compactExecutors.get(1)).isTrue();
+
+        // check that compactExecutor should be shutdown by operator
+        ExecutorService compactExecutor = compactExecutors.get(0);
+        for (StoreSinkWrite storeSinkWrite : storeSinkWrites) {
+            storeSinkWrite.close();
+            assertThat(compactExecutor.isShutdown()).isFalse();
+        }
+
+        operator.close();
+        assertThat(compactExecutor.isShutdown()).isTrue();
+
+        runner.stop();
+        t.join();
+        harness.close();
+    }
+
     private OneInputStreamOperatorTestHarness<CdcMultiplexRecord, 
MultiTableCommittable>
             createTestHarness(Catalog.Loader catalogLoader) throws Exception {
         CdcRecordStoreMultiWriteOperator operator =

Reply via email to