This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9d8aa9e66 [flink] Fix unstable
RemoteLookupJoinITCase.testServiceFileCleaned test method. (#3653)
9d8aa9e66 is described below
commit 9d8aa9e665222aeda8264783c5007b03f83247f3
Author: Kerwin <[email protected]>
AuthorDate: Tue Jul 2 15:26:24 2024 +0800
[flink] Fix unstable RemoteLookupJoinITCase.testServiceFileCleaned test
method. (#3653)
---
.../main/java/org/apache/paimon/flink/query/RemoteTableQuery.java | 7 +++++++
.../test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java | 6 ++----
.../main/java/org/apache/paimon/service/client/KvQueryClient.java | 6 +++++-
3 files changed, 14 insertions(+), 5 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
index fc07e58f9..62edd6bae 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.query;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
@@ -35,6 +36,7 @@ import org.apache.paimon.utils.TypeUtils;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;
@@ -109,4 +111,9 @@ public class RemoteTableQuery implements TableQuery {
public void close() throws IOException {
client.shutdown();
}
+
+ @VisibleForTesting
+ public CompletableFuture<Void> cancel() {
+ return client.shutdownFuture();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
index 13c674662..336c58217 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
@@ -39,7 +39,6 @@ import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.Closeable;
@@ -95,7 +94,7 @@ public class RemoteLookupJoinITCase extends CatalogITCaseBase
{
assertThat(query.lookup(row(), 0, row(5))).isNull();
service.close();
- query.close();
+ query.cancel().get();
}
@Test
@@ -135,7 +134,6 @@ public class RemoteLookupJoinITCase extends
CatalogITCaseBase {
proxy.close();
}
- @Disabled // TODO Fix unstable
@Test
public void testServiceFileCleaned() throws Exception {
sql(
@@ -153,7 +151,7 @@ public class RemoteLookupJoinITCase extends
CatalogITCaseBase {
.isEqualTo(11);
client.cancel().get();
- query.close();
+ query.cancel().get();
ServiceManager serviceManager =
paimonTable("DIM").store().newServiceManager();
assertThat(serviceManager.service(PRIMARY_KEY_LOOKUP).isPresent()).isFalse();
}
diff --git
a/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java
b/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java
index 60c1ac0f8..a1b950c27 100644
---
a/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java
+++
b/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java
@@ -110,10 +110,14 @@ public class KvQueryClient {
public void shutdown() {
try {
- networkClient.shutdown().get(10L, TimeUnit.SECONDS);
+ shutdownFuture().get(60L, TimeUnit.SECONDS);
LOG.info("{} was shutdown successfully.",
networkClient.getClientName());
} catch (Exception e) {
LOG.warn(String.format("%s shutdown failed.",
networkClient.getClientName()), e);
}
}
+
+ public CompletableFuture<Void> shutdownFuture() {
+ return networkClient.shutdown();
+ }
}