This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 6355d13a0584ea8bd8d90bb0649e5ab0a84e5f19 Author: Aitozi <[email protected]> AuthorDate: Wed May 22 19:13:04 2024 +0800 [flink] Cleanup QueryService file after job is down (#3382) --- .../paimon/flink/service/QueryAddressRegister.java | 14 ++++++---- .../paimon/flink/RemoteLookupJoinITCase.java | 31 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java index ed883d9eb..df3cf7abf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java @@ -35,13 +35,13 @@ import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP; /** Operator for address server to register addresses to {@link ServiceManager}. */ public class QueryAddressRegister extends RichSinkFunction<InternalRow> { - private final Table table; + private final ServiceManager serviceManager; private transient int numberExecutors; private transient TreeMap<Integer, InetSocketAddress> executors; public QueryAddressRegister(Table table) { - this.table = table; + this.serviceManager = ((FileStoreTable) table).store().newServiceManager(); } @Override @@ -67,10 +67,14 @@ public class QueryAddressRegister extends RichSinkFunction<InternalRow> { executors.put(executorId, new InetSocketAddress(hostname, port)); if (executors.size() == numberExecutors) { - FileStoreTable storeTable = (FileStoreTable) table; - ServiceManager manager = storeTable.store().newServiceManager(); - manager.resetService( + serviceManager.resetService( PRIMARY_KEY_LOOKUP, executors.values().toArray(new InetSocketAddress[0])); } } + + @Override + public void close() throws Exception { + super.close(); + serviceManager.deleteService(PRIMARY_KEY_LOOKUP); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java index 9e7aac281..2d8d072d1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.query.RemoteTableQuery; +import org.apache.paimon.flink.service.QueryService; import org.apache.paimon.service.ServiceManager; import org.apache.paimon.service.network.stats.DisabledServiceRequestStats; import org.apache.paimon.service.server.KvQueryServer; @@ -34,6 +35,8 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.BlockingIterator; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; @@ -131,6 +134,34 @@ public class RemoteLookupJoinITCase extends CatalogITCaseBase { proxy.close(); } + @Test + public void testServiceFileCleaned() throws Exception { + sql( + "CREATE TABLE DIM (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket' = '2', 'continuous.discovery-interval' = '1ms')"); + JobClient client = queryService(paimonTable("DIM")); + + RemoteTableQuery query = new RemoteTableQuery(paimonTable("DIM")); + + sql("INSERT INTO DIM VALUES (1, 11), (2, 22), (3, 33), (4, 44)"); + Thread.sleep(2000); + + assertThat(query.lookup(row(), 0, row(1))) + .isNotNull() + .extracting(r -> r.getInt(1)) + .isEqualTo(11); + + client.cancel().get(); + query.close(); + ServiceManager serviceManager = paimonTable("DIM").store().newServiceManager(); + assertThat(serviceManager.service(PRIMARY_KEY_LOOKUP).isPresent()).isFalse(); + } + + private JobClient queryService(FileStoreTable table) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + QueryService.build(env, table, 2); + return env.executeAsync(); + } + private ServiceProxy launchQueryServer(String tableName) throws Throwable { FileStoreTable table = (FileStoreTable) paimonTable(tableName); LocalTableQuery query = table.newLocalTableQuery().withIOManager(IOManager.create(path));
