This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b54bfbe3b7 Fix JdbcRegistry will get duplicate children (#16348)
b54bfbe3b7 is described below
commit b54bfbe3b7fdea1d24ec806b2170d7a2d758e52a
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Jul 20 09:25:21 2024 +0800
Fix JdbcRegistry will get duplicate children (#16348)
---
.../plugin/registry/RegistryTestCase.java | 49 +++++++++++-----------
.../plugin/registry/jdbc/JdbcRegistry.java | 1 +
2 files changed, 25 insertions(+), 25 deletions(-)
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
index 1edb0327f8..3d0c169e59 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.registry;
+import static com.google.common.truth.Truth.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -35,14 +36,11 @@ import java.util.concurrent.atomic.AtomicReference;
import lombok.SneakyThrows;
-import org.assertj.core.util.Lists;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import com.google.common.truth.Truth;
-
public abstract class RegistryTestCase<R extends Registry> {
protected R registry;
@@ -62,7 +60,7 @@ public abstract class RegistryTestCase<R extends Registry> {
@Test
public void testIsConnected() {
registry.start();
- Truth.assertThat(registry.isConnected()).isTrue();
+ assertThat(registry.isConnected()).isTrue();
}
@Test
@@ -119,7 +117,7 @@ public abstract class RegistryTestCase<R extends Registry> {
AtomicReference<ConnectionState> connectionState = new
AtomicReference<>();
registry.addConnectionStateListener(connectionState::set);
- Truth.assertThat(connectionState.get()).isNull();
+ assertThat(connectionState.get()).isNull();
registry.start();
await().atMost(Duration.ofSeconds(2))
@@ -134,7 +132,7 @@ public abstract class RegistryTestCase<R extends Registry> {
String value = "127.0.0.1:8080";
assertThrows(RegistryException.class, () -> registry.get(key));
registry.put(key, value, true);
- Truth.assertThat(registry.get(key)).isEqualTo(value);
+ assertThat(registry.get(key)).isEqualTo(value);
}
@Test
@@ -143,11 +141,11 @@ public abstract class RegistryTestCase<R extends
Registry> {
String key = "/nodes/master" + System.nanoTime();
String value = "127.0.0.1:8080";
registry.put(key, value, true);
- Truth.assertThat(registry.get(key)).isEqualTo(value);
+ assertThat(registry.get(key)).isEqualTo(value);
// Update the value
registry.put(key, "123", true);
- Truth.assertThat(registry.get(key)).isEqualTo("123");
+ assertThat(registry.get(key)).isEqualTo("123");
}
@Test
@@ -159,22 +157,23 @@ public abstract class RegistryTestCase<R extends
Registry> {
registry.delete(key);
registry.put(key, value, true);
- Truth.assertThat(registry.get(key)).isEqualTo(value);
+ assertThat(registry.get(key)).isEqualTo(value);
registry.delete(key);
- Truth.assertThat(registry.exists(key)).isFalse();
+ assertThat(registry.exists(key)).isFalse();
}
@Test
public void testChildren() {
registry.start();
- String master1 = "/nodes/children/127.0.0.1:8080";
- String master2 = "/nodes/children/127.0.0.2:8080";
+ String master1 = "/nodes/children/childGroup1/127.0.0.1:8080";
+ String master2 = "/nodes/children/childGroup1/127.0.0.2:8080";
String value = "123";
registry.put(master1, value, true);
registry.put(master2, value, true);
- Truth.assertThat(registry.children("/nodes/children"))
-
.containsAtLeastElementsIn(Lists.newArrayList("127.0.0.1:8080",
"127.0.0.2:8080"));
+
assertThat(registry.children("/nodes/children")).containsExactly("childGroup1");
+
assertThat(registry.children("/nodes/children/childGroup1")).containsExactly("127.0.0.1:8080",
+ "127.0.0.2:8080");
}
@Test
@@ -182,9 +181,9 @@ public abstract class RegistryTestCase<R extends Registry> {
registry.start();
String key = "/nodes/master" + System.nanoTime();
String value = "123";
- Truth.assertThat(registry.exists(key)).isFalse();
+ assertThat(registry.exists(key)).isFalse();
registry.put(key, value, true);
- Truth.assertThat(registry.exists(key)).isTrue();
+ assertThat(registry.exists(key)).isTrue();
}
@@ -195,10 +194,10 @@ public abstract class RegistryTestCase<R extends
Registry> {
String lockKey = "/lock" + System.nanoTime();
// 1. Acquire the lock at the main thread
- Truth.assertThat(registry.acquireLock(lockKey)).isTrue();
+ assertThat(registry.acquireLock(lockKey)).isTrue();
// Acquire the lock at the main thread again
// It should acquire success
- Truth.assertThat(registry.acquireLock(lockKey)).isTrue();
+ assertThat(registry.acquireLock(lockKey)).isTrue();
// Acquire the lock at another thread
// It should acquire failed
@@ -213,17 +212,17 @@ public abstract class RegistryTestCase<R extends
Registry> {
registry.start();
String lockKey = "/lock" + System.nanoTime();
// 1. Acquire the lock in the main thread
- Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+ assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
// Acquire the lock in the main thread
// It should acquire success
- Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+ assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
// Acquire the lock at another thread
// It should acquire failed
CompletableFuture<Boolean> acquireResult =
CompletableFuture.supplyAsync(() ->
registry.acquireLock(lockKey, 3000));
- Truth.assertThat(acquireResult.get()).isFalse();
+ assertThat(acquireResult.get()).isFalse();
}
@@ -233,21 +232,21 @@ public abstract class RegistryTestCase<R extends
Registry> {
registry.start();
String lockKey = "/lock" + System.nanoTime();
// 1. Acquire the lock in the main thread
- Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+ assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
// Acquire the lock at another thread
// It should acquire failed
CompletableFuture<Boolean> acquireResult =
CompletableFuture.supplyAsync(() ->
registry.acquireLock(lockKey, 3000));
- Truth.assertThat(acquireResult.get()).isFalse();
+ assertThat(acquireResult.get()).isFalse();
// 2. Release the lock in the main thread
- Truth.assertThat(registry.releaseLock(lockKey)).isTrue();
+ assertThat(registry.releaseLock(lockKey)).isTrue();
// Acquire the lock at another thread
// It should acquire success
acquireResult = CompletableFuture.supplyAsync(() ->
registry.acquireLock(lockKey, 3000));
- Truth.assertThat(acquireResult.get()).isTrue();
+ assertThat(acquireResult.get()).isTrue();
}
public abstract R createRegistry();
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
index c8f205dafd..cc646e12e5 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
@@ -212,6 +212,7 @@ public final class JdbcRegistry implements Registry {
.map(JdbcRegistryDataDTO::getDataKey)
.filter(fullPath -> fullPath.length() > key.length())
.map(fullPath ->
StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/"))
+ .distinct()
.collect(Collectors.toList());
} catch (Exception e) {
throw new RegistryException(String.format("Get key: %s children
error", key), e);