This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.8
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 6355d13a0584ea8bd8d90bb0649e5ab0a84e5f19
Author: Aitozi <[email protected]>
AuthorDate: Wed May 22 19:13:04 2024 +0800

    [flink] Cleanup QueryService file after job is down (#3382)
---
 .../paimon/flink/service/QueryAddressRegister.java | 14 ++++++----
 .../paimon/flink/RemoteLookupJoinITCase.java       | 31 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
index ed883d9eb..df3cf7abf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
@@ -35,13 +35,13 @@ import static 
org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;
 /** Operator for address server to register addresses to {@link 
ServiceManager}. */
 public class QueryAddressRegister extends RichSinkFunction<InternalRow> {
 
-    private final Table table;
+    private final ServiceManager serviceManager;
 
     private transient int numberExecutors;
     private transient TreeMap<Integer, InetSocketAddress> executors;
 
     public QueryAddressRegister(Table table) {
-        this.table = table;
+        this.serviceManager = ((FileStoreTable) 
table).store().newServiceManager();
     }
 
     @Override
@@ -67,10 +67,14 @@ public class QueryAddressRegister extends 
RichSinkFunction<InternalRow> {
         executors.put(executorId, new InetSocketAddress(hostname, port));
 
         if (executors.size() == numberExecutors) {
-            FileStoreTable storeTable = (FileStoreTable) table;
-            ServiceManager manager = storeTable.store().newServiceManager();
-            manager.resetService(
+            serviceManager.resetService(
                     PRIMARY_KEY_LOOKUP, executors.values().toArray(new 
InetSocketAddress[0]));
         }
     }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        serviceManager.deleteService(PRIMARY_KEY_LOOKUP);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
index 9e7aac281..2d8d072d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.flink.query.RemoteTableQuery;
+import org.apache.paimon.flink.service.QueryService;
 import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.service.network.stats.DisabledServiceRequestStats;
 import org.apache.paimon.service.server.KvQueryServer;
@@ -34,6 +35,8 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.BlockingIterator;
 
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.Test;
@@ -131,6 +134,34 @@ public class RemoteLookupJoinITCase extends 
CatalogITCaseBase {
         proxy.close();
     }
 
+    @Test
+    public void testServiceFileCleaned() throws Exception {
+        sql(
+                "CREATE TABLE DIM (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH 
('bucket' = '2', 'continuous.discovery-interval' = '1ms')");
+        JobClient client = queryService(paimonTable("DIM"));
+
+        RemoteTableQuery query = new RemoteTableQuery(paimonTable("DIM"));
+
+        sql("INSERT INTO DIM VALUES (1, 11), (2, 22), (3, 33), (4, 44)");
+        Thread.sleep(2000);
+
+        assertThat(query.lookup(row(), 0, row(1)))
+                .isNotNull()
+                .extracting(r -> r.getInt(1))
+                .isEqualTo(11);
+
+        client.cancel().get();
+        query.close();
+        ServiceManager serviceManager = 
paimonTable("DIM").store().newServiceManager();
+        
assertThat(serviceManager.service(PRIMARY_KEY_LOOKUP).isPresent()).isFalse();
+    }
+
+    private JobClient queryService(FileStoreTable table) throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        QueryService.build(env, table, 2);
+        return env.executeAsync();
+    }
+
     private ServiceProxy launchQueryServer(String tableName) throws Throwable {
         FileStoreTable table = (FileStoreTable) paimonTable(tableName);
         LocalTableQuery query = 
table.newLocalTableQuery().withIOManager(IOManager.create(path));

Reply via email to