This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 07a119abfbac0ce29981f048cd8d9df326f1c4e7 Author: Mingyu Chen <[email protected]> AuthorDate: Wed Jun 1 17:55:30 2022 +0800 [fix](routine-load) fix bug that routine load task can not find backend (#9902) Introduced from #9492. --- .../doris/load/routineload/RoutineLoadManager.java | 5 +++-- .../org/apache/doris/system/SystemInfoService.java | 21 ++++++++++++++++----- .../apache/doris/system/SystemInfoServiceTest.java | 13 +++++++++---- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index ed220f88b8..9dac66b996 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -496,13 +496,14 @@ public class RoutineLoadManager implements Writable { } else { tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser()); if (tags == UserProperty.INVALID_RESOURCE_TAGS) { - // user may be dropped. Here we fall back to use replica tag + // user may be dropped, or may not set resource tag property. + // Here we fall back to use replica tag tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId()); } } BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().setCluster(cluster) .addTags(tags).build(); - return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 20000); + return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1 /* as many as possible */); } private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 82571cd3d8..ef31b53e19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -776,14 +776,16 @@ public class SystemInfoService { * Select a set of backends by the given policy. * * @param policy - * @param number number of backends which need to be selected. + * @param number number of backends which need to be selected. -1 means return as many as possible. * @return return #number of backend ids, * or empty set if no backends match the policy, or the number of matched backends is less than "number"; */ public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) { + Preconditions.checkArgument(number >= -1); List<Backend> candidates = idToBackendRef.values().stream().filter(policy::isMatch).collect(Collectors.toList()); - if (candidates.size() < number) { + if ((number != -1 && candidates.size() < number) || candidates.isEmpty()) { + LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); return Lists.newArrayList(); } // If only need one Backend, just return a random one. @@ -794,7 +796,11 @@ public class SystemInfoService { if (policy.allowOnSameHost) { Collections.shuffle(candidates); - return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + if (number == -1) { + return candidates.stream().map(b -> b.getId()).collect(Collectors.toList()); + } else { + return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + } } // for each host, random select one backend. @@ -813,11 +819,16 @@ public class SystemInfoService { Collections.shuffle(list); candidates.add(list.get(0)); } - if (candidates.size() < number) { + if (number != -1 && candidates.size() < number) { + LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); return Lists.newArrayList(); } Collections.shuffle(candidates); - return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + if (number != -1) { + return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + } else { + return candidates.stream().map(b -> b.getId()).collect(Collectors.toList()); + } } public ImmutableMap<Long, Backend> getIdToBackend() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index b2570095a0..9134c66c44 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -185,15 +185,20 @@ public class SystemInfoServiceTest { BeSelectionPolicy policy10 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga, tagb)) .setStorageMedium(TStorageMedium.SSD).build(); Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, 4).size()); + Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy10, 3).size()); + // check return as many as possible + Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, -1).size()); Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy10, 5).size()); - BeSelectionPolicy policy11 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb)) - .setStorageMedium(TStorageMedium.HDD).build(); + BeSelectionPolicy policy11 = + new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb)).setStorageMedium(TStorageMedium.HDD) + .build(); Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy11, 1).size()); // 7. check disk usage - BeSelectionPolicy policy12 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) - .setStorageMedium(TStorageMedium.HDD).build(); + BeSelectionPolicy policy12 = + new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)).setStorageMedium(TStorageMedium.HDD) + .build(); Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy12, 1).size()); BeSelectionPolicy policy13 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) .setStorageMedium(TStorageMedium.HDD).needCheckDiskUsage().build(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
