This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d4e972e1e4b Add checking for new checkpoint (#14353)
d4e972e1e4b is described below
commit d4e972e1e4b798d7e057e910deea4d192fe57952
Author: panhongan <[email protected]>
AuthorDate: Mon Sep 4 15:48:55 2023 +0800
Add checking for new checkpoint (#14353)
Check that a checkpoint is non-empty before adding it to the checkpoint
sequence
in a SeekableStreamSupervisor
---
.../supervisor/SeekableStreamSupervisor.java | 9 +-
.../SeekableStreamSupervisorStateTest.java | 263 +++++++++++++++++++++
2 files changed, 270 insertions(+), 2 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f79785b8237..2f6cb008b84 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -40,6 +40,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections4.MapUtils;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.error.DruidException;
@@ -691,8 +692,12 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return;
}
final Map<PartitionIdType, SequenceOffsetType> newCheckpoint =
checkpointTaskGroup(taskGroup, false).get();
- taskGroup.addNewCheckpoint(newCheckpoint);
- log.info("Handled checkpoint notice, new checkpoint is [%s] for
taskGroup [%s]", newCheckpoint, taskGroupId);
+ if (MapUtils.isNotEmpty(newCheckpoint)) {
+ taskGroup.addNewCheckpoint(newCheckpoint);
+ log.info("Handled checkpoint notice, new checkpoint is [%s] for
taskGroup [%s]", newCheckpoint, taskGroupId);
+ } else {
+ log.warn("New checkpoint is null for taskGroup [%s]", taskGroupId);
+ }
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 7e8afdf6bc5..819a6baacd8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -19,12 +19,16 @@
package org.apache.druid.indexing.seekablestream.supervisor;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionSchema;
@@ -34,6 +38,8 @@ import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
@@ -44,6 +50,7 @@ import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
@@ -57,6 +64,7 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFac
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -98,6 +106,7 @@ import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -771,6 +780,169 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
verifyAll();
}
+ @Test(timeout = 60_000L)
+ public void testCheckpointForActiveTaskGroup() throws InterruptedException,
JsonProcessingException
+ {
+ DateTime startTime = DateTimes.nowUtc();
+ SeekableStreamSupervisorIOConfig ioConfig = new
SeekableStreamSupervisorIOConfig(
+ STREAM,
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
+ 1,
+ 1,
+ new Period("PT1H"),
+ new Period("PT1S"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ null,
+ new IdleConfig(true, 200L),
+ null
+ ) {};
+
+ EasyMock.reset(spec);
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new
DruidMonitorSchedulerConfig() {
+ @Override
+ public Duration getEmissionDuration()
+ {
+ return new Period("PT2S").toStandardDuration();
+ }
+ }).anyTimes();
+ EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+ EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
+
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+ SeekableStreamIndexTaskIOConfig taskIoConfig = createTaskIoConfigExt(
+ 0,
+ Collections.singletonMap("0", "10"),
+ Collections.singletonMap("0", "20"),
+ "test",
+ startTime,
+ null,
+ Collections.emptySet(),
+ ioConfig
+ );
+
+ SeekableStreamIndexTaskTuningConfig taskTuningConfig =
getTuningConfig().convertToTaskTuningConfig();
+
+ TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
+ sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));
+
+ Map<String, Object> context = new HashMap<>();
+ context.put("checkpoints", new
ObjectMapper().writeValueAsString(sequenceOffsets));
+
+ SeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
+ "id1",
+ null,
+ getDataSchema(),
+ taskTuningConfig,
+ taskIoConfig,
+ context,
+ "0"
+ );
+
+ SeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
+ "id2",
+ null,
+ getDataSchema(),
+ taskTuningConfig,
+ taskIoConfig,
+ context,
+ "0"
+ );
+
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
+
+ Collection workItems = new ArrayList<>();
+ workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
+ workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+
+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
+ .andReturn(ImmutableList.of(id1, id2))
+ .anyTimes();
+
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+
+ EasyMock.reset(indexerMetadataStorageCoordinator);
+ EasyMock.expect(
+
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new
TestSeekableStreamDataSourceMetadata(null)
+ ).anyTimes();
+
EasyMock.expect(indexTaskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
+
EasyMock.expect(indexTaskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
+
+
EasyMock.expect(indexTaskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
+
EasyMock.expect(indexTaskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
+
+ ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
+ final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
+ checkpoints.put(0, partitionOffset);
+
+
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .anyTimes();
+
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.pauseAsync("id1"))
+ .andReturn(Futures.immediateFuture(partitionOffset))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.pauseAsync("id2"))
+ .andReturn(Futures.immediateFuture(partitionOffset))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset,
false))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset,
false))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.resumeAsync("id1"))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.resumeAsync("id2"))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.stopAsync("id1", false))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.stopAsync("id2", false))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+
+ replayAll();
+
+ SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+ supervisor.start();
+ supervisor.runInternal();
+
+ supervisor.checkpoint(
+ 0,
+ new TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamStartSequenceNumbers<>(STREAM,
checkpoints.get(0), ImmutableSet.of())
+ )
+ );
+
+ while (supervisor.getNoticesQueueSize() > 0) {
+ Thread.sleep(100);
+ }
+
+ verifyAll();
+
+ Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
+ }
@Test
public void testEmitBothLag() throws Exception
@@ -2208,4 +2380,95 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
}
}
}
+
+ private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
+ {
+ private final String taskType;
+ private final TaskLocation location;
+ private final String dataSource;
+
+ TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> result,
TaskLocation location)
+ {
+ super(task.getId(), result);
+ this.taskType = task.getType();
+ this.location = location;
+ this.dataSource = task.getDataSource();
+ }
+
+ @Override
+ public TaskLocation getLocation()
+ {
+ return location;
+ }
+
+ @Override
+ public String getTaskType()
+ {
+ return taskType;
+ }
+
+ @Override
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+ }
+
+ private static class TestSeekableStreamDataSourceMetadata extends
SeekableStreamDataSourceMetadata<String, String>
+ {
+
+ @JsonCreator
+ public TestSeekableStreamDataSourceMetadata(
+ @JsonProperty("partitions") SeekableStreamSequenceNumbers<String,
String> partitions
+ )
+ {
+ super(partitions);
+ }
+
+ @Override
+ public DataSourceMetadata asStartMetadata()
+ {
+ final SeekableStreamSequenceNumbers<String, String> sequenceNumbers =
getSeekableStreamSequenceNumbers();
+ if (sequenceNumbers instanceof SeekableStreamEndSequenceNumbers) {
+ return createConcreteDataSourceMetaData(
+ ((SeekableStreamEndSequenceNumbers<String, String>)
sequenceNumbers).asStartPartitions(true)
+ );
+ } else {
+ return this;
+ }
+ }
+
+ @Override
+ protected SeekableStreamDataSourceMetadata<String, String>
createConcreteDataSourceMetaData(
+ SeekableStreamSequenceNumbers<String, String>
seekableStreamSequenceNumbers
+ )
+ {
+ return new
TestSeekableStreamDataSourceMetadata(seekableStreamSequenceNumbers);
+ }
+ }
+
+ private static SeekableStreamIndexTaskIOConfig createTaskIoConfigExt(
+ int groupId,
+ Map<String, String> startPartitions,
+ Map<String, String> endPartitions,
+ String baseSequenceName,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
+ Set<String> exclusiveStartSequenceNumberPartitions,
+ SeekableStreamSupervisorIOConfig ioConfig
+ )
+ {
+ return new SeekableStreamIndexTaskIOConfig<String, String>(
+ groupId,
+ baseSequenceName,
+ new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions,
exclusiveStartSequenceNumberPartitions),
+ new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
+ true,
+ minimumMessageTime,
+ maximumMessageTime,
+ ioConfig.getInputFormat()
+ )
+ {
+ };
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]