This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4eadf1cf3 [flink][mysql-cdc] All FileStoreWrites generated in database
synchronization job should use the same compaction executor service (#1727)
4eadf1cf3 is described below
commit 4eadf1cf3609e098d2a756c0cae089f6d7e0df76
Author: yuzelin <[email protected]>
AuthorDate: Fri Aug 4 12:04:00 2023 +0800
[flink][mysql-cdc] All FileStoreWrites generated in database
synchronization job should use the same compaction executor service (#1727)
---
.../org/apache/paimon/utils/CommonTestUtils.java | 5 ++
.../paimon/operation/AbstractFileStoreWrite.java | 13 ++-
.../apache/paimon/table/sink/TableWriteImpl.java | 6 ++
.../paimon/flink/sink/StoreSinkWriteImpl.java | 11 +++
.../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 43 ++++++++--
.../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 98 ++++++++++++++++++----
6 files changed, 149 insertions(+), 27 deletions(-)
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
b/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
index 54ddc4ffe..2461d8ba4 100644
--- a/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/CommonTestUtils.java
@@ -80,6 +80,11 @@ public class CommonTestUtils {
assertThatThrownBy(code::call).isInstanceOf(expected).hasMessageContaining(msg);
}
+ public static void waitUtil(Supplier<Boolean> condition, Duration timeout,
Duration pause)
+ throws TimeoutException, InterruptedException {
+ waitUtil(condition, timeout, pause, "Failed to wait for condition to
be true.");
+ }
+
/**
* Wait util the given condition is met or timeout.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 352e4b627..ad2fbb720 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -70,6 +70,7 @@ public abstract class AbstractFileStoreWrite<T>
protected final Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers;
private ExecutorService lazyCompactExecutor;
+ private boolean closeCompactExecutorWhenLeaving = true;
private boolean ignorePreviousFiles = false;
protected boolean isStreamingMode = false;
@@ -102,6 +103,11 @@ public abstract class AbstractFileStoreWrite<T>
this.ignorePreviousFiles = ignorePreviousFiles;
}
+ public void withCompactExecutor(ExecutorService compactExecutor) {
+ this.lazyCompactExecutor = compactExecutor;
+ this.closeCompactExecutorWhenLeaving = false;
+ }
+
@Override
public void write(BinaryRow partition, int bucket, T data) throws
Exception {
WriterContainer<T> container = getWriterWrapper(partition, bucket);
@@ -231,7 +237,7 @@ public abstract class AbstractFileStoreWrite<T>
}
}
writers.clear();
- if (lazyCompactExecutor != null) {
+ if (lazyCompactExecutor != null && closeCompactExecutorWhenLeaving) {
lazyCompactExecutor.shutdownNow();
}
}
@@ -356,6 +362,11 @@ public abstract class AbstractFileStoreWrite<T>
return lazyCompactExecutor;
}
+ @VisibleForTesting
+ public ExecutorService getCompactExecutor() {
+ return lazyCompactExecutor;
+ }
+
protected void notifyNewWriter(RecordWriter<T> writer) {}
protected abstract RecordWriter<T> createWriter(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 2c96eac5a..d3fe25007 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -31,6 +31,7 @@ import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.utils.Restorable;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -86,6 +87,11 @@ public class TableWriteImpl<T>
return this;
}
+ public TableWriteImpl<T> withCompactExecutor(ExecutorService
compactExecutor) {
+ write.withCompactExecutor(compactExecutor);
+ return this;
+ }
+
@Override
public BinaryRow getPartition(InternalRow row) {
keyAndBucketExtractor.setRecord(row);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 1b6b4d53f..d1baa5e85 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
@@ -40,6 +41,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -148,6 +150,10 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
}
}
+ public void withCompactExecutor(ExecutorService compactExecutor) {
+ write.withCompactExecutor(compactExecutor);
+ }
+
@Override
public SinkRecord write(InternalRow rowData) throws Exception {
return write.writeAndReturn(rowData);
@@ -223,4 +229,9 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
write = newTableWrite(newTable);
write.restore((List) states);
}
+
+ @VisibleForTesting
+ public TableWriteImpl<?> getWrite() {
+ return write;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 34292d0ef..50799b76f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
@@ -25,12 +26,14 @@ import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StateUtils;
import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -42,6 +45,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
@@ -61,12 +66,12 @@ public class CdcRecordStoreMultiWriteOperator
private final Catalog.Loader catalogLoader;
private MemoryPoolFactory memoryPoolFactory;
-
- protected Catalog catalog;
- protected Map<Identifier, FileStoreTable> tables;
- protected StoreSinkWriteState state;
- protected Map<Identifier, StoreSinkWrite> writes;
- protected String commitUser;
+ private Catalog catalog;
+ private Map<Identifier, FileStoreTable> tables;
+ private StoreSinkWriteState state;
+ private Map<Identifier, StoreSinkWrite> writes;
+ private String commitUser;
+ private ExecutorService compactExecutor;
public CdcRecordStoreMultiWriteOperator(
Catalog.Loader catalogLoader,
@@ -96,6 +101,10 @@ public class CdcRecordStoreMultiWriteOperator
state = new StoreSinkWriteState(context, (tableName, partition,
bucket) -> true);
tables = new HashMap<>();
writes = new HashMap<>();
+ compactExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory(
+ Thread.currentThread().getName() +
"-CdcMultiWrite-Compaction"));
}
@Override
@@ -108,8 +117,6 @@ public class CdcRecordStoreMultiWriteOperator
FileStoreTable table = getTable(tableId);
- // TODO set executor service to write
-
// all table write should share one write buffer so that writers can
preempt memory
// from those of other tables
if (memoryPoolFactory == null) {
@@ -134,6 +141,8 @@ public class CdcRecordStoreMultiWriteOperator
getContainingTask().getEnvironment().getIOManager(),
memoryPoolFactory));
+ ((StoreSinkWriteImpl) write).withCompactExecutor(compactExecutor);
+
Optional<GenericRow> optionalConverted =
toGenericRow(record.record(), table.schema().fields());
if (!optionalConverted.isPresent()) {
@@ -201,6 +210,9 @@ public class CdcRecordStoreMultiWriteOperator
for (StoreSinkWrite write : writes.values()) {
write.close();
}
+ if (compactExecutor != null) {
+ compactExecutor.shutdownNow();
+ }
}
@Override
@@ -219,4 +231,19 @@ public class CdcRecordStoreMultiWriteOperator
}
return committables;
}
+
+ @VisibleForTesting
+ public Map<Identifier, FileStoreTable> tables() {
+ return tables;
+ }
+
+ @VisibleForTesting
+ public Map<Identifier, StoreSinkWrite> writes() {
+ return writes;
+ }
+
+ @VisibleForTesting
+ public String commitUser() {
+ return commitUser;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 675c13da3..fd5046e8b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -24,8 +24,8 @@ import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
-import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
@@ -37,6 +37,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.TraceableFileIO;
import org.apache.flink.api.common.ExecutionConfig;
@@ -51,15 +52,16 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
-import java.io.IOException;
-import java.io.UncheckedIOException;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -147,16 +149,6 @@ public class CdcRecordStoreMultiWriteOperatorTest {
assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
}
- private static FileIO getFileIO(CatalogContext catalogContext, Path
warehouse) {
- FileIO fileIO;
- try {
- fileIO = FileIO.get(warehouse, catalogContext);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- return fileIO;
- }
-
@Test
@Timeout(30)
public void testAsyncTableCreate() throws Exception {
@@ -245,14 +237,14 @@ public class CdcRecordStoreMultiWriteOperatorTest {
CdcRecordStoreMultiWriteOperator operator =
(CdcRecordStoreMultiWriteOperator) harness.getOperator();
- assertThat(operator.tables.size()).isEqualTo(0);
- assertThat(operator.writes.size()).isEqualTo(0);
+ assertThat(operator.tables().size()).isEqualTo(0);
+ assertThat(operator.writes().size()).isEqualTo(0);
catalog.createTable(tableId, firstTableSchema, true);
actual = runner.take();
assertThat(actual).isEqualTo(expected);
- assertThat(operator.tables.size()).isEqualTo(1);
- assertThat(operator.writes.size()).isEqualTo(1);
+ assertThat(operator.tables().size()).isEqualTo(1);
+ assertThat(operator.writes().size()).isEqualTo(1);
// after table is created, record should be processed immediately
fields = new HashMap<>();
@@ -278,7 +270,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
harness.initializeState(snapshot);
operator = (CdcRecordStoreMultiWriteOperator) harness.getOperator();
- assertThat(operator.commitUser).isEqualTo(prevCommitUser);
+ assertThat(operator.commitUser()).isEqualTo(prevCommitUser);
runner.stop();
t.join();
@@ -640,6 +632,76 @@ public class CdcRecordStoreMultiWriteOperatorTest {
harness.close();
}
+ @Test
+ @Timeout(30)
+ public void testUsingTheSameCompactExecutor() throws Exception {
+ OneInputStreamOperatorTestHarness<CdcMultiplexRecord,
MultiTableCommittable> harness =
+ createTestHarness(catalogLoader);
+ harness.open();
+
+ Runner runner = new Runner(harness);
+ Thread t = new Thread(runner);
+ t.start();
+
+ // write records to two tables thus two FileStoreWrite will be created
+ Map<String, String> fields;
+
+ // first table record
+ fields = new HashMap<>();
+ fields.put("pt", "0");
+ fields.put("k", "1");
+ fields.put("v", "10");
+
+ CdcMultiplexRecord expected =
+ CdcMultiplexRecord.fromCdcRecord(
+ databaseName,
+ firstTable.getObjectName(),
+ new CdcRecord(RowKind.INSERT, fields));
+ runner.offer(expected);
+
+ // second table record
+ fields = new HashMap<>();
+ fields.put("k", "1");
+ fields.put("v1", "10");
+ fields.put("v2", "0.625");
+ fields.put("v3", "one");
+ fields.put("v4", "b_one");
+ expected =
+ CdcMultiplexRecord.fromCdcRecord(
+ databaseName,
+ secondTable.getObjectName(),
+ new CdcRecord(RowKind.INSERT, fields));
+ runner.offer(expected);
+
+ // get and check compactExecutor from two FileStoreWrite
+ CdcRecordStoreMultiWriteOperator operator =
+ (CdcRecordStoreMultiWriteOperator) harness.getOperator();
+ CommonTestUtils.waitUtil(
+ () -> operator.writes().size() == 2, Duration.ofSeconds(5),
Duration.ofMillis(100));
+
+ List<StoreSinkWrite> storeSinkWrites = new
ArrayList<>(operator.writes().values());
+ List<ExecutorService> compactExecutors = new ArrayList<>();
+ for (StoreSinkWrite storeSinkWrite : storeSinkWrites) {
+ StoreSinkWriteImpl storeSinkWriteImpl = (StoreSinkWriteImpl)
storeSinkWrite;
+
compactExecutors.add(storeSinkWriteImpl.getWrite().getWrite().getCompactExecutor());
+ }
+ assertThat(compactExecutors.get(0) ==
compactExecutors.get(1)).isTrue();
+
+ // check that compactExecutor should be shutdown by operator
+ ExecutorService compactExecutor = compactExecutors.get(0);
+ for (StoreSinkWrite storeSinkWrite : storeSinkWrites) {
+ storeSinkWrite.close();
+ assertThat(compactExecutor.isShutdown()).isFalse();
+ }
+
+ operator.close();
+ assertThat(compactExecutor.isShutdown()).isTrue();
+
+ runner.stop();
+ t.join();
+ harness.close();
+ }
+
private OneInputStreamOperatorTestHarness<CdcMultiplexRecord,
MultiTableCommittable>
createTestHarness(Catalog.Loader catalogLoader) throws Exception {
CdcRecordStoreMultiWriteOperator operator =