This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch 34.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/34.0.0 by this push:
new 7c1b3f1d3a2 [Backport] Address NPE in CloneHistoricals duty (#18293)
7c1b3f1d3a2 is described below
commit 7c1b3f1d3a2ee924440c6c88727d5a5ac6eb18e4
Author: Lucas Capistrant <[email protected]>
AuthorDate: Sun Jul 20 08:51:10 2025 -0500
[Backport] Address NPE in CloneHistoricals duty (#18293)
* Address NPE in CloneHistoricals duty (#18274)
* Modify embedded test for 34.0.0 compatibility
* fix checkstyle
---------
Co-authored-by: Kashif Faraz <[email protected]>
---
.../embedded/server/HistoricalCloningTest.java | 143 +++++++++++++++++++++
.../server/coordinator/duty/CloneHistoricals.java | 60 ++++++++-
2 files changed, 196 insertions(+), 7 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
new file mode 100644
index 00000000000..f4a3f3eaac9
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+public class HistoricalCloningTest extends EmbeddedClusterTestBase
+{
+ private final EmbeddedHistorical historical1 = new EmbeddedHistorical();
+ private final EmbeddedHistorical historical2 = new EmbeddedHistorical()
+ .addProperty("druid.plaintextPort", "7083");
+ private final EmbeddedHistorical historical3 = new EmbeddedHistorical()
+ .addProperty("druid.plaintextPort", "6083");
+ private final EmbeddedCoordinator coordinator1 = new EmbeddedCoordinator();
+ private final EmbeddedCoordinator coordinator2 = new EmbeddedCoordinator()
+ .addProperty("druid.plaintextPort", "7081");
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addServer(overlord)
+ .addServer(coordinator1)
+ .addServer(coordinator2)
+ .addServer(new EmbeddedIndexer())
+ .addServer(historical1)
+ .addServer(historical3)
+ .addServer(new EmbeddedBroker())
+ .addServer(new EmbeddedRouter());
+ }
+
+ @Test
+ public void
test_cloneHistoricals_inTurboMode_duringCoordinatorLeaderSwitch() throws
Exception
+ {
+ runIngestion();
+
+ // Wait for segments to be loaded on historical1
+ coordinator1.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("segment/loadQueue/success")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+ agg -> agg.hasSumAtLeast(20)
+ );
+ coordinator1.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("segment/loadQueue/success")
+ .hasDimension("server",
historical1.bindings().selfNode().getHostAndPort())
+ .hasDimension("description", "LOAD: NORMAL"),
+ agg -> agg.hasSumAtLeast(10)
+ );
+
+ // Switch coordinator leader to force syncer to reset
+ coordinator1.stop();
+
+ // Wait for a few coordinator runs so that the server views are refreshed
+ coordinator2.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("coordinator/time")
+ .hasDimension("dutyGroup", "HistoricalManagementDuties"),
+ agg -> agg.hasCountAtLeast(2)
+ );
+
+ // Add historical2 to the cluster
+ cluster.addServer(historical2);
+ historical2.start();
+
+ cluster.callApi().onLeaderCoordinator(
+ c -> c.updateCoordinatorDynamicConfig(
+ CoordinatorDynamicConfig
+ .builder()
+
.withCloneServers(Map.of(historical2.bindings().selfNode().getHostAndPort(),
historical1.bindings().selfNode().getHostAndPort()))
+
.withTurboLoadingNodes(Set.of(historical2.bindings().selfNode().getHostAndPort()))
+ .build()
+ )
+ );
+
+ // Wait for the clones to be loaded
+ coordinator2.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("segment/clone/assigned/count")
+ .hasDimension("server",
historical2.bindings().selfNode().getHostAndPort()),
+ agg -> agg.hasSumAtLeast(10)
+ );
+ coordinator2.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("segment/loadQueue/success")
+ .hasDimension("server",
historical2.bindings().selfNode().getHostAndPort())
+ .hasDimension("description", "LOAD: TURBO"),
+ agg -> agg.hasSumAtLeast(10)
+ );
+ }
+
+ private void runIngestion()
+ {
+ final String taskId = IdUtils.getRandomId();
+ final Object task = createIndexTaskForInlineData(
+ taskId,
+ StringUtils.replace(Resources.CSV_DATA_10_DAYS, "\n", "\\n")
+ );
+
+ cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
+ cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+ }
+
+ private Object createIndexTaskForInlineData(String taskId, String
inlineDataCsv)
+ {
+ return EmbeddedClusterApis.createTaskFromPayload(
+ taskId,
+ StringUtils.format(Resources.INDEX_TASK_PAYLOAD_WITH_INLINE_DATA,
inlineDataCsv, dataSource)
+ );
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
index 70b4e0a950b..fd4c293afc0 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java
@@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator.duty;
+import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CloneStatusManager;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@@ -34,6 +35,7 @@ import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
+import javax.annotation.Nullable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -102,18 +104,15 @@ public class CloneHistoricals implements CoordinatorDuty
final Set<DataSegment> targetProjectedSegments =
targetServer.getProjectedSegments();
// Load any segments missing in the clone target.
for (DataSegment segment : sourceProjectedSegments) {
- if (!targetProjectedSegments.contains(segment) &&
loadQueueManager.loadSegment(segment, targetServer, SegmentAction.LOAD)) {
- stats.add(
- Stats.Segments.ASSIGNED_TO_CLONE,
- RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
- 1L
- );
+ if (!targetProjectedSegments.contains(segment)) {
+ loadSegmentOnTargetServer(segment, targetServer, params);
}
}
// Drop any segments missing from the clone source.
for (DataSegment segment : targetProjectedSegments) {
- if (!sourceProjectedSegments.contains(segment) &&
loadQueueManager.dropSegment(segment, targetServer)) {
+ if (!sourceProjectedSegments.contains(segment)
+ && loadQueueManager.dropSegment(segment, targetServer)) {
stats.add(
Stats.Segments.DROPPED_FROM_CLONE,
RowKey.of(Dimension.SERVER, targetServer.getServer().getName()),
@@ -129,6 +128,53 @@ public class CloneHistoricals implements CoordinatorDuty
return params;
}
+ private void loadSegmentOnTargetServer(
+ DataSegment segment,
+ ServerHolder targetServer,
+ DruidCoordinatorRuntimeParams params
+ )
+ {
+ final RowKey.Builder rowKey = RowKey
+ .with(Dimension.SERVER, targetServer.getServer().getName())
+ .with(Dimension.DATASOURCE, segment.getDataSource());
+
+ final DataSegment loadableSegment = getLoadableSegment(segment, params);
+ if (loadableSegment == null) {
+ params.getCoordinatorStats().add(
+ Stats.Segments.ASSIGN_SKIPPED,
+ rowKey.and(Dimension.DESCRIPTION, "Segment not found in metadata
cache"),
+ 1L
+ );
+ } else if (loadQueueManager.loadSegment(loadableSegment, targetServer,
SegmentAction.LOAD)) {
+ params.getCoordinatorStats().add(
+ Stats.Segments.ASSIGNED_TO_CLONE,
+ rowKey.build(),
+ 1L
+ );
+ }
+ }
+
+ /**
+ * Returns a DataSegment with the correct value of loadSpec (as obtained from
+ * metadata store). This method may return null if there is no snapshot
available
+ * for the underlying datasource or if the segment is unused.
+ */
+ @Nullable
+ private DataSegment getLoadableSegment(DataSegment segmentToMove,
DruidCoordinatorRuntimeParams params)
+ {
+ if (!params.isUsedSegment(segmentToMove)) {
+ return null;
+ }
+
+ ImmutableDruidDataSource datasource = params.getDataSourcesSnapshot()
+
.getDataSource(segmentToMove.getDataSource());
+ if (datasource == null) {
+ return null;
+ }
+
+ return datasource.getSegment(segmentToMove.getId());
+ }
+
/**
* Create a status map of cloning progress based on the cloneServers mapping
and its current load queue.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]