This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2e9548d refactor SeekableStreamSupervisor usage of RecordSupplier
(#9819)
2e9548d is described below
commit 2e9548d93d5a27e824cc93293e1de55af63d158f
Author: Clint Wylie <[email protected]>
AuthorDate: Sat May 16 14:09:39 2020 -0700
refactor SeekableStreamSupervisor usage of RecordSupplier (#9819)
* refactor SeekableStreamSupervisor usage of RecordSupplier to reduce
contention between background threads and main thread, refactor
KinesisRecordSupplier, refactor Kinesis lag metric collection and emitting
* fix style and test
* cleanup, refactor, javadocs, test
* fixes
* keep collecting current offsets and lag if unhealthy in background
reporting thread
* review stuffs
* add comment
---
.../java/org/apache/druid/indexer/TaskIdUtils.java | 8 +
.../indexing/kafka/supervisor/KafkaSupervisor.java | 44 +-
.../druid/indexing/kinesis/KinesisIndexTask.java | 10 +-
.../indexing/kinesis/KinesisRecordSupplier.java | 647 +++++++++++----------
.../druid/indexing/kinesis/KinesisSamplerSpec.java | 3 +-
.../indexing/kinesis/KinesisSequenceNumber.java | 28 +-
.../kinesis/supervisor/KinesisSupervisor.java | 43 +-
.../indexing/kinesis/KinesisIndexTaskTest.java | 2 +-
.../kinesis/KinesisRecordSupplierTest.java | 64 +-
.../kinesis/supervisor/KinesisSupervisorTest.java | 166 +++---
.../supervisor/SeekableStreamSupervisor.java | 164 +++---
.../SeekableStreamSupervisorStateTest.java | 20 +-
.../supervisor/SupervisorStateManager.java | 7 +-
13 files changed, 669 insertions(+), 537 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
index a88341b..76317d9 100644
--- a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
+++ b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexer;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.druid.java.util.common.StringUtils;
@@ -31,6 +32,8 @@ public class TaskIdUtils
{
private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S
].*");
+ private static final Joiner UNDERSCORE_JOINER = Joiner.on("_");
+
public static void validateId(String thingToValidate, String
stringToValidate)
{
Preconditions.checkArgument(
@@ -60,4 +63,9 @@ public class TaskIdUtils
}
return suffix.toString();
}
+
+ public static String getRandomIdWithPrefix(String prefix)
+ {
+ return UNDERSCORE_JOINER.join(prefix, TaskIdUtils.getRandomId());
+ }
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 6c88c5a..b2352fe 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
@@ -46,6 +45,7 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
@@ -219,7 +219,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<Integer, Long>
List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
- String taskId = Joiner.on("_").join(baseSequenceName,
TaskIdUtils.getRandomId());
+ String taskId = TaskIdUtils.getRandomIdWithPrefix(baseSequenceName);
taskList.add(new KafkaIndexTask(
taskId,
new TaskResource(baseSequenceName, 1),
@@ -334,16 +334,38 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<Integer, Long>
}
@Override
- protected void updateLatestSequenceFromStream(
- RecordSupplier<Integer, Long> recordSupplier,
- Set<StreamPartition<Integer>> partitions
- )
+ protected void updatePartitionLagFromStream()
{
- latestSequenceFromStream = partitions.stream()
- .collect(Collectors.toMap(
- StreamPartition::getPartitionId,
- recordSupplier::getPosition
- ));
+ getRecordSupplierLock().lock();
+ try {
+ Set<Integer> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ Set<StreamPartition<Integer>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ recordSupplier.seekToLatest(partitions);
+
+ // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
+ // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
+ // task offsets from the latest offsets from the stream when it is needed
+ latestSequenceFromStream =
+
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId,
recordSupplier::getPosition));
+ }
+ catch (InterruptedException e) {
+ throw new StreamException(e);
+ }
+ finally {
+ getRecordSupplierLock().unlock();
+ }
}
@Override
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index 5d5a307..bfd3758 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.inject.name.Named;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
@@ -97,8 +98,12 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String>
KinesisIndexTaskTuningConfig tuningConfig =
((KinesisIndexTaskTuningConfig) super.tuningConfig);
int fetchThreads = tuningConfig.getFetchThreads() != null
? tuningConfig.getFetchThreads()
- : Math.max(1,
ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
+ : Runtime.getRuntime().availableProcessors() * 2;
+ Preconditions.checkArgument(
+ fetchThreads > 0,
+ "Must have at least one background fetch thread for the record
supplier"
+ );
return new KinesisRecordSupplier(
KinesisRecordSupplier.getAmazonKinesisClient(
ioConfig.getEndpoint(),
@@ -114,7 +119,8 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String>
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getFetchSequenceNumberTimeout(),
- tuningConfig.getMaxRecordsPerPoll()
+ tuningConfig.getMaxRecordsPerPoll(),
+ false
);
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index f4b7830..b32ee14 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -54,6 +54,7 @@ import
org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -82,7 +83,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
@@ -95,6 +98,14 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000;
private static final long EXCEPTION_RETRY_DELAY_MS = 10000;
+ /**
+ * We call getRecords with limit 1000 to make sure that we can find the
first (earliest) record in the shard.
+ * In the case where the shard is constantly removing records that are past
their retention period, it is possible
+ * that we never find the first record in the shard if we use a limit of 1.
+ */
+ private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000;
+ private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10;
+
private static boolean isServiceExceptionRecoverable(AmazonServiceException
ex)
{
final boolean isIOException = ex.getCause() instanceof IOException;
@@ -102,6 +113,37 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
return isIOException || isTimeout;
}
+ /**
+ * Returns an array with the content between the position and limit of
"buffer". This may be the buffer's backing
+ * array itself. Does not modify position or limit of the buffer.
+ */
+ private static byte[] toByteArray(final ByteBuffer buffer)
+ {
+ if (buffer.hasArray()
+ && buffer.arrayOffset() == 0
+ && buffer.position() == 0
+ && buffer.array().length == buffer.limit()) {
+ return buffer.array();
+ } else {
+ final byte[] retVal = new byte[buffer.remaining()];
+ buffer.duplicate().get(retVal);
+ return retVal;
+ }
+ }
+
+ /**
+ * Catch any exception and wrap it in a {@link StreamException}
+ */
+ private static <T> T wrapExceptions(Callable<T> callable)
+ {
+ try {
+ return callable.call();
+ }
+ catch (Exception e) {
+ throw new StreamException(e);
+ }
+ }
+
private class PartitionResource
{
private final StreamPartition<String> streamPartition;
@@ -112,101 +154,80 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
// to indicate that this shard has no more records to read
@Nullable
private volatile String shardIterator;
- private volatile boolean started;
- private volatile boolean stopRequested;
-
private volatile long currentLagMillis;
- PartitionResource(StreamPartition<String> streamPartition)
+ private final AtomicBoolean fetchStarted = new AtomicBoolean();
+ private ScheduledFuture<?> currentFetch;
+
+ private PartitionResource(StreamPartition<String> streamPartition)
{
this.streamPartition = streamPartition;
}
- void startBackgroundFetch()
+ private void startBackgroundFetch()
{
- if (started) {
+ if (!backgroundFetchEnabled) {
return;
}
+ // if seek has been called
+ if (shardIterator == null) {
+ log.warn(
+ "Skipping background fetch for stream[%s] partition[%s] since seek
has not been called for this partition",
+ streamPartition.getStream(),
+ streamPartition.getPartitionId()
+ );
+ return;
+ }
+ if (fetchStarted.compareAndSet(false, true)) {
+ log.debug(
+ "Starting scheduled fetch for stream[%s] partition[%s]",
+ streamPartition.getStream(),
+ streamPartition.getPartitionId()
+ );
- log.info(
- "Starting scheduled fetch runnable for stream[%s] partition[%s]",
- streamPartition.getStream(),
- streamPartition.getPartitionId()
- );
-
- stopRequested = false;
- started = true;
-
- rescheduleRunnable(fetchDelayMillis);
- }
-
- void stopBackgroundFetch()
- {
- log.info(
- "Stopping scheduled fetch runnable for stream[%s] partition[%s]",
- streamPartition.getStream(),
- streamPartition.getPartitionId()
- );
- stopRequested = true;
+ scheduleBackgroundFetch(fetchDelayMillis);
+ }
}
- long getPartitionTimeLag()
+ private void stopBackgroundFetch()
{
- return currentLagMillis;
+ if (fetchStarted.compareAndSet(true, false)) {
+ log.debug(
+ "Stopping scheduled fetch for stream[%s] partition[%s]",
+ streamPartition.getStream(),
+ streamPartition.getPartitionId()
+ );
+ if (currentFetch != null && !currentFetch.isDone()) {
+ currentFetch.cancel(true);
+ }
+ }
}
- long getPartitionTimeLag(String offset)
+ private void scheduleBackgroundFetch(long delayMillis)
{
- // if not started (fetching records in background), fetch lag ourself
with a throw-away iterator
- if (!started) {
+ if (fetchStarted.get()) {
try {
- final String iteratorType;
- final String offsetToUse;
- if (offset == null || KinesisSupervisor.NOT_SET.equals(offset)) {
- // this should probably check if will start processing earliest or
latest rather than assuming earliest
- // if latest we could skip this because latest will not be behind
latest so lag is 0.
- iteratorType = ShardIteratorType.TRIM_HORIZON.toString();
- offsetToUse = null;
- } else {
- iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
- offsetToUse = offset;
- }
- String shardIterator = kinesis.getShardIterator(
- streamPartition.getStream(),
- streamPartition.getPartitionId(),
- iteratorType,
- offsetToUse
- ).getShardIterator();
-
- GetRecordsResult recordsResult = kinesis.getRecords(
- new
GetRecordsRequest().withShardIterator(shardIterator).withLimit(recordsPerFetch)
- );
-
- currentLagMillis = recordsResult.getMillisBehindLatest();
- return currentLagMillis;
+ currentFetch = scheduledExec.schedule(fetchRecords(), delayMillis,
TimeUnit.MILLISECONDS);
}
- catch (Exception ex) {
- // eat it
+ catch (RejectedExecutionException e) {
log.warn(
- ex,
- "Failed to determine partition lag for partition %s of stream
%s",
- streamPartition.getPartitionId(),
- streamPartition.getStream()
+ e,
+ "Caught RejectedExecutionException, KinesisRecordSupplier for
partition[%s] has likely temporarily shutdown the ExecutorService. "
+ + "This is expected behavior after calling seek(),
seekToEarliest() and seekToLatest()",
+ streamPartition.getPartitionId()
);
+
}
+ } else {
+ log.debug("Worker for partition[%s] is already stopped",
streamPartition.getPartitionId());
}
- return currentLagMillis;
}
- private Runnable getRecordRunnable()
+ private Runnable fetchRecords()
{
return () -> {
-
- if (stopRequested) {
- started = false;
- stopRequested = false;
-
- log.info("Worker for partition[%s] has been stopped",
streamPartition.getPartitionId());
+ if (!fetchStarted.get()) {
+ log.debug("Worker for partition[%s] has been stopped",
streamPartition.getPartitionId());
return;
}
@@ -231,7 +252,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
if (!records.offer(currRecord, recordBufferOfferTimeout,
TimeUnit.MILLISECONDS)) {
log.warn("OrderedPartitionableRecord buffer full, retrying in
[%,dms]", recordBufferFullWait);
- rescheduleRunnable(recordBufferFullWait);
+ scheduleBackgroundFetch(recordBufferFullWait);
}
return;
@@ -298,14 +319,14 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
currRecord.getSequenceNumber()
).getShardIterator();
- rescheduleRunnable(recordBufferFullWait);
+ scheduleBackgroundFetch(recordBufferFullWait);
return;
}
}
shardIterator = recordsResult.getNextShardIterator(); // will be
null if the shard has been closed
- rescheduleRunnable(fetchDelayMillis);
+ scheduleBackgroundFetch(fetchDelayMillis);
}
catch (ProvisionedThroughputExceededException e) {
log.warn(
@@ -315,7 +336,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
+ "the available throughput. Reduce the frequency or size of
your requests."
);
long retryMs = Math.max(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS,
fetchDelayMillis);
- rescheduleRunnable(retryMs);
+ scheduleBackgroundFetch(retryMs);
}
catch (InterruptedException e) {
// may happen if interrupted while BlockingQueue.offer() is waiting
@@ -324,7 +345,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
"Interrupted while waiting to add record to buffer, retrying in
[%,dms]",
EXCEPTION_RETRY_DELAY_MS
);
- rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS);
+ scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS);
}
catch (ExpiredIteratorException e) {
log.warn(
@@ -334,7 +355,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
);
if (recordsResult != null) {
shardIterator = recordsResult.getNextShardIterator(); // will be
null if the shard has been closed
- rescheduleRunnable(fetchDelayMillis);
+ scheduleBackgroundFetch(fetchDelayMillis);
} else {
throw new ISE("can't reschedule fetch records runnable,
recordsResult is null??");
}
@@ -347,7 +368,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
catch (AmazonServiceException e) {
if (isServiceExceptionRecoverable(e)) {
log.warn(e, "encounted unknown recoverable AWS exception, retrying
in [%,dms]", EXCEPTION_RETRY_DELAY_MS);
- rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS);
+ scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS);
} else {
log.warn(e, "encounted unknown unrecoverable AWS exception, will
not retry");
throw new RuntimeException(e);
@@ -355,31 +376,32 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
}
catch (Throwable e) {
// non transient errors
- log.error(e, "unknown getRecordRunnable exception, will not retry");
+ log.error(e, "unknown fetchRecords exception, will not retry");
throw new RuntimeException(e);
}
};
}
- private void rescheduleRunnable(long delayMillis)
+ private void seek(ShardIteratorType iteratorEnum, String sequenceNumber)
{
- if (started && !stopRequested) {
- try {
- scheduledExec.schedule(getRecordRunnable(), delayMillis,
TimeUnit.MILLISECONDS);
- }
- catch (RejectedExecutionException e) {
- log.warn(
- e,
- "Caught RejectedExecutionException, KinesisRecordSupplier for
partition[%s] has likely temporarily shutdown the ExecutorService. "
- + "This is expected behavior after calling seek(),
seekToEarliest() and seekToLatest()",
- streamPartition.getPartitionId()
- );
+ log.debug(
+ "Seeking partition [%s] to [%s]",
+ streamPartition.getPartitionId(),
+ sequenceNumber != null ? sequenceNumber : iteratorEnum.toString()
+ );
- }
- } else {
- log.info("Worker for partition[%s] has been stopped",
streamPartition.getPartitionId());
- }
+ shardIterator = wrapExceptions(() -> kinesis.getShardIterator(
+ streamPartition.getStream(),
+ streamPartition.getPartitionId(),
+ iteratorEnum.toString(),
+ sequenceNumber
+ ).getShardIterator());
+ }
+
+ private long getPartitionTimeLag()
+ {
+ return currentLagMillis;
}
}
@@ -398,6 +420,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
private final int maxRecordsPerPoll;
private final int fetchThreads;
private final int recordBufferSize;
+ private final boolean useEarliestSequenceNumber;
private ScheduledExecutorService scheduledExec;
@@ -405,8 +428,9 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
new ConcurrentHashMap<>();
private BlockingQueue<OrderedPartitionableRecord<String, String>> records;
- private volatile boolean checkPartitionsStarted = false;
+ private final boolean backgroundFetchEnabled;
private volatile boolean closed = false;
+ private AtomicBoolean partitionsFetchStarted = new AtomicBoolean();
public KinesisRecordSupplier(
AmazonKinesis amazonKinesis,
@@ -418,7 +442,8 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
int recordBufferOfferTimeout,
int recordBufferFullWait,
int fetchSequenceNumberTimeout,
- int maxRecordsPerPoll
+ int maxRecordsPerPoll,
+ boolean useEarliestSequenceNumber
)
{
Preconditions.checkNotNull(amazonKinesis);
@@ -432,6 +457,8 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
this.maxRecordsPerPoll = maxRecordsPerPoll;
this.fetchThreads = fetchThreads;
this.recordBufferSize = recordBufferSize;
+ this.useEarliestSequenceNumber = useEarliestSequenceNumber;
+ this.backgroundFetchEnabled = fetchThreads > 0;
// the deaggregate function is implemented by the amazon-kinesis-client,
whose license is not compatible with Apache.
// The work around here is to use reflection to find the deaggregate
function in the classpath. See details on the
@@ -459,16 +486,18 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
getDataHandle = null;
}
- log.info(
- "Creating fetch thread pool of size [%d]
(Runtime.availableProcessors=%d)",
- fetchThreads,
- Runtime.getRuntime().availableProcessors()
- );
+ if (backgroundFetchEnabled) {
+ log.info(
+ "Creating fetch thread pool of size [%d]
(Runtime.availableProcessors=%d)",
+ fetchThreads,
+ Runtime.getRuntime().availableProcessors()
+ );
- scheduledExec = Executors.newScheduledThreadPool(
- fetchThreads,
- Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
- );
+ scheduledExec = Executors.newScheduledThreadPool(
+ fetchThreads,
+ Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
+ );
+ }
records = new LinkedBlockingQueue<>(recordBufferSize);
}
@@ -517,13 +546,36 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
public void start()
{
checkIfClosed();
- if (checkPartitionsStarted) {
+ if (backgroundFetchEnabled && partitionsFetchStarted.compareAndSet(false,
true)) {
partitionResources.values().forEach(PartitionResource::startBackgroundFetch);
- checkPartitionsStarted = false;
}
}
@Override
+ public void close()
+ {
+ if (this.closed) {
+ return;
+ }
+
+ assign(ImmutableSet.of());
+
+ scheduledExec.shutdown();
+
+ try {
+ if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS,
TimeUnit.MILLISECONDS)) {
+ scheduledExec.shutdownNow();
+ }
+ }
+ catch (InterruptedException e) {
+ log.warn(e, "InterruptedException while shutting down");
+ throw new RuntimeException(e);
+ }
+
+ this.closed = true;
+ }
+
+ @Override
public void assign(Set<StreamPartition<String>> collection)
{
checkIfClosed();
@@ -535,56 +587,55 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
)
);
- for (Iterator<Map.Entry<StreamPartition<String>, PartitionResource>> i =
partitionResources.entrySet()
-
.iterator(); i.hasNext(); ) {
+ Iterator<Map.Entry<StreamPartition<String>, PartitionResource>> i =
partitionResources.entrySet().iterator();
+ while (i.hasNext()) {
Map.Entry<StreamPartition<String>, PartitionResource> entry = i.next();
if (!collection.contains(entry.getKey())) {
i.remove();
entry.getValue().stopBackgroundFetch();
}
}
+ }
+ @Override
+ public Collection<StreamPartition<String>> getAssignment()
+ {
+ return partitionResources.keySet();
}
@Override
public void seek(StreamPartition<String> partition, String sequenceNumber)
throws InterruptedException
{
- checkIfClosed();
- filterBufferAndResetFetchRunnable(ImmutableSet.of(partition));
- seekInternal(partition, sequenceNumber,
ShardIteratorType.AT_SEQUENCE_NUMBER);
+ filterBufferAndResetBackgroundFetch(ImmutableSet.of(partition));
+ partitionSeek(partition, sequenceNumber,
ShardIteratorType.AT_SEQUENCE_NUMBER);
}
@Override
public void seekToEarliest(Set<StreamPartition<String>> partitions) throws
InterruptedException
{
- checkIfClosed();
- filterBufferAndResetFetchRunnable(partitions);
- partitions.forEach(partition -> seekInternal(partition, null,
ShardIteratorType.TRIM_HORIZON));
+ filterBufferAndResetBackgroundFetch(partitions);
+ partitions.forEach(partition -> partitionSeek(partition, null,
ShardIteratorType.TRIM_HORIZON));
}
@Override
public void seekToLatest(Set<StreamPartition<String>> partitions) throws
InterruptedException
{
- checkIfClosed();
- filterBufferAndResetFetchRunnable(partitions);
- partitions.forEach(partition -> seekInternal(partition, null,
ShardIteratorType.LATEST));
+ filterBufferAndResetBackgroundFetch(partitions);
+ partitions.forEach(partition -> partitionSeek(partition, null,
ShardIteratorType.LATEST));
}
+ @Nullable
@Override
- public Collection<StreamPartition<String>> getAssignment()
+ public String getPosition(StreamPartition<String> partition)
{
- return partitionResources.keySet();
+ throw new UnsupportedOperationException("getPosition() is not supported in
Kinesis");
}
@Nonnull
@Override
public List<OrderedPartitionableRecord<String, String>> poll(long timeout)
{
- checkIfClosed();
- if (checkPartitionsStarted) {
-
partitionResources.values().forEach(PartitionResource::startBackgroundFetch);
- checkPartitionsStarted = false;
- }
+ start();
try {
int expectedSize = Math.min(Math.max(records.size(), 1),
maxRecordsPerPoll);
@@ -616,23 +667,14 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
@Override
public String getLatestSequenceNumber(StreamPartition<String> partition)
{
- checkIfClosed();
- return getSequenceNumberInternal(partition, ShardIteratorType.LATEST);
+ return getSequenceNumber(partition, ShardIteratorType.LATEST);
}
@Nullable
@Override
public String getEarliestSequenceNumber(StreamPartition<String> partition)
{
- checkIfClosed();
- return getSequenceNumberInternal(partition,
ShardIteratorType.TRIM_HORIZON);
- }
-
- @Nullable
- @Override
- public String getPosition(StreamPartition<String> partition)
- {
- throw new UnsupportedOperationException("getPosition() is not supported in
Kinesis");
+ return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
}
@Override
@@ -665,180 +707,179 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
);
}
- @Override
- public void close()
+ /**
+ * Fetch the partition lag, given a stream and set of current partition
offsets. This operates independently from
+ * the {@link PartitionResource} which have been assigned to this record
supplier.
+ */
+ public Map<String, Long> getPartitionsTimeLag(String stream, Map<String,
String> currentOffsets)
{
- if (this.closed) {
- return;
- }
-
- assign(ImmutableSet.of());
-
- scheduledExec.shutdown();
-
- try {
- if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS,
TimeUnit.MILLISECONDS)) {
- scheduledExec.shutdownNow();
- }
- }
- catch (InterruptedException e) {
- log.warn(e, "InterruptedException while shutting down");
- throw new RuntimeException(e);
+ Map<String, Long> partitionLag =
Maps.newHashMapWithExpectedSize(currentOffsets.size());
+ for (Map.Entry<String, String> partitionOffset :
currentOffsets.entrySet()) {
+ StreamPartition<String> partition = new StreamPartition<>(stream,
partitionOffset.getKey());
+ long currentLag = getPartitionTimeLag(partition,
partitionOffset.getValue());
+ partitionLag.put(partitionOffset.getKey(), currentLag);
}
-
- this.closed = true;
+ return partitionLag;
}
- // this is only used for tests
+ /**
+ * This method is only used for tests to verify that {@link
PartitionResource} in fact tracks it's current lag
+ * as it is polled for records. This isn't currently used in production at
all, but could be one day if we were
+ * to prefer to get the lag from the running tasks in the same API call
which fetches the current task offsets,
+ * instead of directly calling the AWS Kinesis API with the offsets returned
from those tasks
+ * (see {@link #getPartitionsTimeLag}, which accepts a map of current
partition offsets).
+ */
@VisibleForTesting
- Map<String, Long> getPartitionTimeLag()
+ Map<String, Long> getPartitionResourcesTimeLag()
{
return partitionResources.entrySet()
.stream()
.collect(
- Collectors.toMap(k ->
k.getKey().getPartitionId(), k -> k.getValue().getPartitionTimeLag())
+ Collectors.toMap(
+ k -> k.getKey().getPartitionId(),
+ k -> k.getValue().getPartitionTimeLag()
+ )
);
}
- public Map<String, Long> getPartitionTimeLag(Map<String, String>
currentOffsets)
+ @VisibleForTesting
+ public int bufferSize()
{
- Map<String, Long> partitionLag =
Maps.newHashMapWithExpectedSize(currentOffsets.size());
- for (Map.Entry<StreamPartition<String>, PartitionResource> partition :
partitionResources.entrySet()) {
- final String partitionId = partition.getKey().getPartitionId();
- partitionLag.put(partitionId,
partition.getValue().getPartitionTimeLag(currentOffsets.get(partitionId)));
- }
- return partitionLag;
+ return records.size();
+ }
+
+ @VisibleForTesting
+ public boolean isBackgroundFetchRunning()
+ {
+ return partitionsFetchStarted.get();
}
- private void seekInternal(StreamPartition<String> partition, String
sequenceNumber, ShardIteratorType iteratorEnum)
+ /**
+ * Check that a {@link PartitionResource} has been assigned to this record
supplier, and if so call
+ * {@link PartitionResource#seek} to move it to the latest offsets. Note
that this method does not restart background
+ * fetch, which should have been stopped prior to calling this method by a
call to
+ * {@link #filterBufferAndResetBackgroundFetch}.
+ */
+ private void partitionSeek(StreamPartition<String> partition, String
sequenceNumber, ShardIteratorType iteratorEnum)
{
PartitionResource resource = partitionResources.get(partition);
if (resource == null) {
throw new ISE("Partition [%s] has not been assigned", partition);
}
-
- log.debug(
- "Seeking partition [%s] to [%s]",
- partition.getPartitionId(),
- sequenceNumber != null ? sequenceNumber : iteratorEnum.toString()
- );
-
- resource.shardIterator = wrapExceptions(() -> kinesis.getShardIterator(
- partition.getStream(),
- partition.getPartitionId(),
- iteratorEnum.toString(),
- sequenceNumber
- ).getShardIterator());
-
- checkPartitionsStarted = true;
+ resource.seek(iteratorEnum, sequenceNumber);
}
- private void filterBufferAndResetFetchRunnable(Set<StreamPartition<String>>
partitions) throws InterruptedException
+ /**
+ * Given a partition and a {@link ShardIteratorType}, create a shard
iterator and fetch
+ * {@link #GET_SEQUENCE_NUMBER_RECORD_COUNT} records and return the first
sequence number from the result set.
+ * This method is thread safe as it does not depend on the internal state of
the supplier (it doesn't use the
+ * {@link PartitionResource} which have been assigned to the supplier), and
the Kinesis client is thread safe.
+ */
+ @Nullable
+ private String getSequenceNumber(StreamPartition<String> partition,
ShardIteratorType iteratorEnum)
{
- scheduledExec.shutdown();
-
- try {
- if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS,
TimeUnit.MILLISECONDS)) {
- scheduledExec.shutdownNow();
- }
- }
- catch (InterruptedException e) {
- log.warn(e, "InterruptedException while shutting down");
- throw e;
- }
+ return wrapExceptions(() -> {
+ String shardIterator =
+ kinesis.getShardIterator(partition.getStream(),
partition.getPartitionId(), iteratorEnum.toString())
+ .getShardIterator();
+ long timeoutMillis = System.currentTimeMillis() +
fetchSequenceNumberTimeout;
+ GetRecordsResult recordsResult = null;
+
+ while (shardIterator != null && System.currentTimeMillis() <
timeoutMillis) {
+
+ if (closed) {
+ log.info("KinesisRecordSupplier closed while fetching
sequenceNumber");
+ return null;
+ }
+ final String currentShardIterator = shardIterator;
+ final GetRecordsRequest request = new
GetRecordsRequest().withShardIterator(currentShardIterator)
+
.withLimit(GET_SEQUENCE_NUMBER_RECORD_COUNT);
+ recordsResult = RetryUtils.retry(
+ () -> kinesis.getRecords(request),
+ (throwable) -> {
+ if (throwable instanceof ProvisionedThroughputExceededException)
{
+ log.warn(
+ throwable,
+ "encountered ProvisionedThroughputExceededException while
fetching records, this means "
+ + "that the request rate for the stream is too high, or
the requested data is too large for "
+ + "the available throughput. Reduce the frequency or size
of your requests. Consider increasing "
+ + "the number of shards to increase throughput."
+ );
+ return true;
+ }
+ return false;
+ },
+ GET_SEQUENCE_NUMBER_RETRY_COUNT
+ );
- scheduledExec = Executors.newScheduledThreadPool(
- fetchThreads,
- Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
- );
+ List<Record> records = recordsResult.getRecords();
- // filter records in buffer and only retain ones whose partition was not
seeked
- BlockingQueue<OrderedPartitionableRecord<String, String>> newQ = new
LinkedBlockingQueue<>(recordBufferSize);
+ if (!records.isEmpty()) {
+ return records.get(0).getSequenceNumber();
+ }
- records.stream()
- .filter(x -> !partitions.contains(x.getStreamPartition()))
- .forEachOrdered(newQ::offer);
+ shardIterator = recordsResult.getNextShardIterator();
+ }
- records = newQ;
+ if (shardIterator == null) {
+ log.info("Partition[%s] returned a null shard iterator, is the shard
closed?", partition.getPartitionId());
+ return KinesisSequenceNumber.END_OF_SHARD_MARKER;
+ }
- // restart fetching threads
- partitionResources.values().forEach(x -> x.started = false);
- checkPartitionsStarted = true;
- }
- @Nullable
- private String getSequenceNumberInternal(StreamPartition<String> partition,
ShardIteratorType iteratorEnum)
- {
- return wrapExceptions(() -> getSequenceNumberInternal(
- partition,
- kinesis.getShardIterator(partition.getStream(),
partition.getPartitionId(), iteratorEnum.toString())
- .getShardIterator()
- ));
+ // if we reach here, it usually means either the shard has no more
records, or records have not been
+ // added to this shard
+ log.warn(
+ "timed out while trying to fetch position for shard[%s],
millisBehindLatest is [%s], likely no more records in shard",
+ partition.getPartitionId(),
+ recordsResult != null ? recordsResult.getMillisBehindLatest() :
"UNKNOWN"
+ );
+ return null;
+ });
}
- @Nullable
- private String getSequenceNumberInternal(StreamPartition<String> partition,
String shardIterator)
+ /**
+ * Given a {@link StreamPartition} and an offset, create a 'shard iterator'
for the offset and fetch a single record
+ * in order to get the lag: {@link
GetRecordsResult#getMillisBehindLatest()}. This method is thread safe as it does
+ * not depend on the internal state of the supplier (it doesn't use the
{@link PartitionResource} which have been
+ * assigned to the supplier), and the Kinesis client is thread safe.
+ */
+ private Long getPartitionTimeLag(StreamPartition<String> partition, String
offset)
{
- long timeoutMillis = System.currentTimeMillis() +
fetchSequenceNumberTimeout;
- GetRecordsResult recordsResult = null;
-
- while (shardIterator != null && System.currentTimeMillis() <
timeoutMillis) {
-
- if (closed) {
- log.info("KinesisRecordSupplier closed while fetching sequenceNumber");
- return null;
- }
- try {
- // we call getRecords with limit 1000 to make sure that we can find
the first (earliest) record in the shard.
- // In the case where the shard is constantly removing records that are
past their retention period, it is possible
- // that we never find the first record in the shard if we use a limit
of 1.
- recordsResult = kinesis.getRecords(new
GetRecordsRequest().withShardIterator(shardIterator).withLimit(1000));
- }
- catch (ProvisionedThroughputExceededException e) {
- log.warn(
- e,
- "encountered ProvisionedThroughputExceededException while fetching
records, this means "
- + "that the request rate for the stream is too high, or the
requested data is too large for "
- + "the available throughput. Reduce the frequency or size of your
requests. Consider increasing "
- + "the number of shards to increase throughput."
- );
- try {
- Thread.sleep(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS);
- continue;
- }
- catch (InterruptedException e1) {
- log.warn(e1, "Thread interrupted!");
- Thread.currentThread().interrupt();
- break;
+ return wrapExceptions(() -> {
+ final String iteratorType;
+ final String offsetToUse;
+ if (offset == null || KinesisSupervisor.OFFSET_NOT_SET.equals(offset)) {
+ if (useEarliestSequenceNumber) {
+ iteratorType = ShardIteratorType.TRIM_HORIZON.toString();
+ offsetToUse = null;
+ } else {
+ // if offset is not set and not using earliest, it means we will
start reading from latest,
+ // so lag will be 0 and we have nothing to do here
+ return 0L;
}
+ } else {
+ iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
+ offsetToUse = offset;
}
+ String shardIterator = kinesis.getShardIterator(
+ partition.getStream(),
+ partition.getPartitionId(),
+ iteratorType,
+ offsetToUse
+ ).getShardIterator();
+
+ GetRecordsResult recordsResult = kinesis.getRecords(
+ new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
+ );
- List<Record> records = recordsResult.getRecords();
-
- if (!records.isEmpty()) {
- return records.get(0).getSequenceNumber();
- }
-
- shardIterator = recordsResult.getNextShardIterator();
- }
-
- if (shardIterator == null) {
- log.info("Partition[%s] returned a null shard iterator, is the shard
closed?", partition.getPartitionId());
- return KinesisSequenceNumber.END_OF_SHARD_MARKER;
- }
-
-
- // if we reach here, it usually means either the shard has no more
records, or records have not been
- // added to this shard
- log.warn(
- "timed out while trying to fetch position for shard[%s],
millisBehindLatest is [%s], likely no more records in shard",
- partition.getPartitionId(),
- recordsResult != null ? recordsResult.getMillisBehindLatest() :
"UNKNOWN"
- );
- return null;
-
+ return recordsResult.getMillisBehindLatest();
+ });
}
+ /**
+ * Explode if {@link #close()} has been called on the supplier.
+ */
private void checkIfClosed()
{
if (closed) {
@@ -847,36 +888,46 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String>
}
/**
- * Returns an array with the content between the position and limit of
"buffer". This may be the buffer's backing
- * array itself. Does not modify position or limit of the buffer.
+ * This method must be called before a seek operation ({@link #seek}, {@link
#seekToLatest}, or
+ * {@link #seekToEarliest}).
+ *
+ * When called, it will nuke the {@link #scheduledExec} that is shared by
all {@link PartitionResource}, filters
+ * records from the buffer for partitions which will have a seek operation
performed, and stops background fetch for
+ * each {@link PartitionResource} to prepare for the seek. If background
fetch is not currently running, the
+ * {@link #scheduledExec} will not be re-created.
*/
- private static byte[] toByteArray(final ByteBuffer buffer)
+ private void
filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> partitions)
throws InterruptedException
{
- if (buffer.hasArray()
- && buffer.arrayOffset() == 0
- && buffer.position() == 0
- && buffer.array().length == buffer.limit()) {
- return buffer.array();
- } else {
- final byte[] retVal = new byte[buffer.remaining()];
- buffer.duplicate().get(retVal);
- return retVal;
- }
- }
+ checkIfClosed();
+ if (backgroundFetchEnabled && partitionsFetchStarted.compareAndSet(true,
false)) {
+ scheduledExec.shutdown();
- private static <T> T wrapExceptions(Callable<T> callable)
- {
- try {
- return callable.call();
- }
- catch (Exception e) {
- throw new StreamException(e);
+ try {
+ if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS,
TimeUnit.MILLISECONDS)) {
+ scheduledExec.shutdownNow();
+ }
+ }
+ catch (InterruptedException e) {
+ log.warn(e, "InterruptedException while shutting down");
+ throw e;
+ }
+
+ scheduledExec = Executors.newScheduledThreadPool(
+ fetchThreads,
+ Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
+ );
}
- }
- @VisibleForTesting
- public int bufferSize()
- {
- return records.size();
+ // filter records in buffer and only retain ones whose partition was not
seeked
+ BlockingQueue<OrderedPartitionableRecord<String, String>> newQ = new
LinkedBlockingQueue<>(recordBufferSize);
+
+ records.stream()
+ .filter(x -> !partitions.contains(x.getStreamPartition()))
+ .forEachOrdered(newQ::offer);
+
+ records = newQ;
+
+ // restart fetching threads
+ partitionResources.values().forEach(x -> x.stopBackgroundFetch());
}
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
index 664b3b5..401efe7 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
@@ -71,7 +71,8 @@ public class KinesisSamplerSpec extends
SeekableStreamSamplerSpec
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getFetchSequenceNumberTimeout(),
- tuningConfig.getMaxRecordsPerPoll()
+ tuningConfig.getMaxRecordsPerPoll(),
+ ioConfig.isUseEarliestSequenceNumber()
);
}
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
index 4bd8fe8..ab13f5b 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
@@ -36,27 +36,33 @@ public class KinesisSequenceNumber extends
OrderedSequenceNumber<String>
*/
public static final String END_OF_SHARD_MARKER = "EOS";
- // this special marker is used by the KinesisSupervisor to set the endOffsets
- // of newly created indexing tasks. This is necessary because streaming
tasks do not
- // have endPartitionOffsets. This marker signals to the task that it should
continue
- // to ingest data until taskDuration has elapsed or the task was stopped or
paused or killed
+
+ /**
+ * This special marker is used by the KinesisSupervisor to set the
endOffsets of newly created indexing tasks. This
+ * is necessary because streaming tasks do not have endPartitionOffsets.
This marker signals to the task that it
+ * should continue to ingest data until taskDuration has elapsed or the
task was stopped or paused or killed.
+ */
public static final String NO_END_SEQUENCE_NUMBER = "NO_END_SEQUENCE_NUMBER";
- // this special marker is used by the KinesisSupervisor to mark that a shard
has been expired
- // (i.e., closed and then the retention period has passed)
+
+ /**
+ * This special marker is used by the KinesisSupervisor to mark that a shard
has been expired
+ * (i.e., closed and then the retention period has passed)
+ */
public static final String EXPIRED_MARKER = "EXPIRED";
- // this flag is used to indicate either END_OF_SHARD_MARKER
- // or NO_END_SEQUENCE_NUMBER so that they can be properly compared
- // with other sequence numbers
+ /**
+ * this flag is used to indicate either END_OF_SHARD_MARKER
+ * or NO_END_SEQUENCE_NUMBER so that they can be properly compared
+ * with other sequence numbers
+ */
private final boolean isMaxSequenceNumber;
private final BigInteger intSequence;
private KinesisSequenceNumber(String sequenceNumber, boolean isExclusive)
{
super(sequenceNumber, isExclusive);
- if (END_OF_SHARD_MARKER.equals(sequenceNumber)
- || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)) {
+ if (END_OF_SHARD_MARKER.equals(sequenceNumber) ||
NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)) {
isMaxSequenceNumber = true;
this.intSequence = null;
} else {
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 0000ee6..494e901 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.kinesis.supervisor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.indexer.TaskIdUtils;
@@ -49,7 +48,6 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
-import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
@@ -64,6 +62,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.stream.Collectors;
/**
* Supervisor responsible for managing the KinesisIndexTask for a single
dataSource. At a high level, the class accepts a
@@ -82,7 +81,7 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
{
};
- public static final String NOT_SET = "-1";
+ public static final String OFFSET_NOT_SET = "-1";
private final KinesisSupervisorSpec spec;
private final AWSCredentialsConfig awsCredentialsConfig;
private volatile Map<String, Long> currentPartitionTimeLag;
@@ -167,7 +166,7 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
List<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
- String taskId = Joiner.on("_").join(baseSequenceName,
TaskIdUtils.getRandomId());
+ String taskId = TaskIdUtils.getRandomIdWithPrefix(baseSequenceName);
taskList.add(new KinesisIndexTask(
taskId,
new TaskResource(baseSequenceName, 1),
@@ -187,8 +186,7 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
@Override
- protected RecordSupplier<String, String> setupRecordSupplier()
- throws RuntimeException
+ protected RecordSupplier<String, String> setupRecordSupplier() throws
RuntimeException
{
KinesisSupervisorIOConfig ioConfig = spec.getIoConfig();
KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig();
@@ -202,13 +200,14 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
),
ioConfig.getRecordsPerFetch(),
ioConfig.getFetchDelayMillis(),
- 1,
+ 0, // skip starting background fetch, it is not used
ioConfig.isDeaggregate(),
taskTuningConfig.getRecordBufferSize(),
taskTuningConfig.getRecordBufferOfferTimeout(),
taskTuningConfig.getRecordBufferFullWait(),
taskTuningConfig.getFetchSequenceNumberTimeout(),
- taskTuningConfig.getMaxRecordsPerPoll()
+ taskTuningConfig.getMaxRecordsPerPoll(),
+ ioConfig.isUseEarliestSequenceNumber()
);
}
@@ -296,7 +295,19 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
@Override
protected Map<String, Long> getTimeLagPerPartition(Map<String, String>
currentOffsets)
{
- return ((KinesisRecordSupplier)
recordSupplier).getPartitionTimeLag(currentOffsets);
+ return currentOffsets
+ .entrySet()
+ .stream()
+ .filter(e -> e.getValue() != null &&
+ currentPartitionTimeLag != null &&
+ currentPartitionTimeLag.get(e.getKey()) != null
+ )
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ e -> currentPartitionTimeLag.get(e.getKey())
+ )
+ );
}
@Override
@@ -315,13 +326,11 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
}
@Override
- protected void updateLatestSequenceFromStream(
- RecordSupplier<String, String> recordSupplier,
- Set<StreamPartition<String>> streamPartitions
- )
+ protected void updatePartitionLagFromStream()
{
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
- currentPartitionTimeLag =
supplier.getPartitionTimeLag(getHighestCurrentOffsets());
+ // this recordSupplier method is thread safe, so does not need to acquire
the recordSupplierLock
+ currentPartitionTimeLag =
supplier.getPartitionsTimeLag(getIoConfig().getStream(),
getHighestCurrentOffsets());
}
@Override
@@ -345,7 +354,7 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
@Override
protected String getNotSetMarker()
{
- return NOT_SET;
+ return OFFSET_NOT_SET;
}
@Override
@@ -454,7 +463,7 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
}
}
- newSequences = new SeekableStreamStartSequenceNumbers<String, String>(
+ newSequences = new SeekableStreamStartSequenceNumbers<>(
old.getStream(),
null,
newPartitionSequenceNumberMap,
@@ -462,7 +471,7 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String>
newExclusiveStartPartitions
);
} else {
- newSequences = new SeekableStreamEndSequenceNumbers<String, String>(
+ newSequences = new SeekableStreamEndSequenceNumbers<>(
old.getStream(),
null,
newPartitionSequenceNumberMap,
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index c886065..4daa2fd 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2034,7 +2034,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
.andReturn(Collections.emptyList())
.anyTimes();
- EasyMock.expect(recordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
+ EasyMock.expect(recordSupplier.getPartitionsTimeLag(EasyMock.anyString(),
EasyMock.anyObject()))
.andReturn(null)
.anyTimes();
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index 4a631e8..6b2f32f 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -203,7 +203,8 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
5000,
5000,
60000,
- 5
+ 5,
+ true
);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
@@ -212,6 +213,9 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(ImmutableSet.of(SHARD_ID1, SHARD_ID0),
recordSupplier.getPartitionIds(STREAM));
+
+ // calling poll would start background fetch if seek was called, but will
instead be skipped and the results
+ // empty
Assert.assertEquals(Collections.emptyList(), recordSupplier.poll(100));
verifyAll();
@@ -290,7 +294,8 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
5000,
5000,
60000,
- 100
+ 100,
+ true
);
recordSupplier.assign(partitions);
@@ -308,7 +313,7 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
- Assert.assertEquals(SHARDS_LAG_MILLIS,
recordSupplier.getPartitionTimeLag());
+ Assert.assertEquals(SHARDS_LAG_MILLIS,
recordSupplier.getPartitionResourcesTimeLag());
}
@Test
@@ -367,7 +372,8 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
5000,
5000,
60000,
- 100
+ 100,
+ true
);
recordSupplier.assign(partitions);
@@ -386,7 +392,7 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
Assert.assertEquals(9, polledRecords.size());
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(4, 12)));
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(1, 2)));
- Assert.assertEquals(SHARDS_LAG_MILLIS,
recordSupplier.getPartitionTimeLag());
+ Assert.assertEquals(SHARDS_LAG_MILLIS,
recordSupplier.getPartitionResourcesTimeLag());
}
@@ -434,7 +440,8 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
5000,
5000,
60000,
- 100
+ 100,
+ true
);
recordSupplier.assign(partitions);
@@ -468,7 +475,8 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
5000,
5000,
60000,
- 5
+ 5,
+ true
);
recordSupplier.assign(partitions);
@@ -530,7 +538,8 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
5000,
5000,
60000,
- 1
+ 1,
+ true
);
recordSupplier.assign(partitions);
@@ -549,7 +558,7 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
);
// only one partition in this test. first results come from
getRecordsResult1, which has SHARD1_LAG_MILLIS
- Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS),
recordSupplier.getPartitionTimeLag());
+ Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS),
recordSupplier.getPartitionResourcesTimeLag());
recordSupplier.seek(StreamPartition.of(STREAM, SHARD_ID1), "7");
recordSupplier.start();
@@ -563,7 +572,7 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
Assert.assertEquals(ALL_RECORDS.get(9), record2);
// only one partition in this test. second results come from
getRecordsResult0, which has SHARD0_LAG_MILLIS
- Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS),
recordSupplier.getPartitionTimeLag());
+ Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS),
recordSupplier.getPartitionResourcesTimeLag());
verifyAll();
}
@@ -622,7 +631,8 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
5000,
5000,
60000,
- 100
+ 100,
+ true
);
recordSupplier.assign(partitions);
@@ -640,7 +650,7 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
- Assert.assertEquals(SHARDS_LAG_MILLIS,
recordSupplier.getPartitionTimeLag());
+ Assert.assertEquals(SHARDS_LAG_MILLIS,
recordSupplier.getPartitionResourcesTimeLag());
}
@Test
@@ -692,7 +702,8 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
5000,
5000,
1000,
- 100
+ 100,
+ true
);
return recordSupplier;
}
@@ -705,16 +716,14 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
EasyMock.eq(SHARD_ID0),
EasyMock.anyString(),
EasyMock.anyString()
- )).andReturn(
- getShardIteratorResult0).anyTimes();
+ )).andReturn(getShardIteratorResult0).anyTimes();
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID1),
EasyMock.anyString(),
EasyMock.anyString()
- )).andReturn(
- getShardIteratorResult1).anyTimes();
+ )).andReturn(getShardIteratorResult1).anyTimes();
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
@@ -728,8 +737,8 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once();
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
-
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
-
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
+
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).times(2);
+
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).times(2);
replayAll();
@@ -738,7 +747,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
StreamPartition.of(STREAM, SHARD_ID1)
);
-
recordSupplier = new KinesisRecordSupplier(
kinesis,
recordsPerFetch,
@@ -749,7 +757,8 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
5000,
5000,
60000,
- 100
+ 100,
+ true
);
recordSupplier.assign(partitions);
@@ -760,16 +769,19 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
Thread.sleep(100);
}
+ Map<String, Long> timeLag = recordSupplier.getPartitionResourcesTimeLag();
+
+
+ Assert.assertEquals(partitions, recordSupplier.getAssignment());
+ Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag);
+
Map<String, String> offsts = ImmutableMap.of(
SHARD_ID1, SHARD1_RECORDS.get(0).getSequenceNumber(),
SHARD_ID0, SHARD0_RECORDS.get(0).getSequenceNumber()
);
- Map<String, Long> timeLag = recordSupplier.getPartitionTimeLag(offsts);
-
+ Map<String, Long> independentTimeLag =
recordSupplier.getPartitionsTimeLag(STREAM, offsts);
+ Assert.assertEquals(SHARDS_LAG_MILLIS, independentTimeLag);
verifyAll();
-
- Assert.assertEquals(partitions, recordSupplier.getAssignment());
- Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag);
}
/**
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 816e0aa..f3a21d9 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -129,7 +129,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
private static final StreamPartition<String> SHARD0_PARTITION =
StreamPartition.of(STREAM, SHARD_ID0);
private static final StreamPartition<String> SHARD1_PARTITION =
StreamPartition.of(STREAM, SHARD_ID1);
private static final StreamPartition<String> SHARD2_PARTITION =
StreamPartition.of(STREAM, SHARD_ID2);
- private static final Map<String, Long> TIME_LAG = ImmutableMap.of(SHARD_ID1,
9000L, SHARD_ID0, 1234L);
private static DataSchema dataSchema;
private KinesisRecordSupplier supervisorRecordSupplier;
@@ -232,9 +231,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -285,6 +281,68 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Test
+ public void testRecordSupplier()
+ {
+ KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new
KinesisSupervisorIOConfig(
+ STREAM,
+ INPUT_FORMAT,
+ "awsEndpoint",
+ null,
+ 1,
+ 1,
+ new Period("PT30M"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ 100,
+ 1000,
+ null,
+ null,
+ false
+ );
+ KinesisIndexTaskClientFactory clientFactory = new
KinesisIndexTaskClientFactory(null, OBJECT_MAPPER);
+ KinesisSupervisor supervisor = new KinesisSupervisor(
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ clientFactory,
+ OBJECT_MAPPER,
+ new KinesisSupervisorSpec(
+ null,
+ dataSchema,
+ tuningConfig,
+ kinesisSupervisorIOConfig,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ clientFactory,
+ OBJECT_MAPPER,
+ new NoopServiceEmitter(),
+ new DruidMonitorSchedulerConfig(),
+ rowIngestionMetersFactory,
+ null,
+ new SupervisorStateManagerConfig()
+ ),
+ rowIngestionMetersFactory,
+ null
+ );
+
+ KinesisRecordSupplier supplier = (KinesisRecordSupplier)
supervisor.setupRecordSupplier();
+ Assert.assertNotNull(supplier);
+ Assert.assertEquals(0, supplier.bufferSize());
+ Assert.assertEquals(Collections.emptySet(), supplier.getAssignment());
+ // background fetch should not be enabled for supervisor supplier
+ supplier.start();
+ Assert.assertFalse(supplier.isBackgroundFetchRunning());
+ }
+
+ @Test
public void testMultiTask() throws Exception
{
supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
@@ -302,9 +360,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -364,9 +419,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -444,9 +496,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -499,9 +548,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -561,9 +607,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<KinesisIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -661,9 +704,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
// non KinesisIndexTask (don't kill)
Task id2 = new RealtimeIndexTask(
@@ -730,9 +770,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@@ -845,9 +882,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -959,9 +993,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
DateTime now = DateTimes.nowUtc();
DateTime maxi = now.plusMinutes(60);
@@ -1099,9 +1130,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -1228,9 +1256,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
final Capture<Task> firstTasks = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -1361,7 +1386,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
+
EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(),
EasyMock.anyObject()))
.andReturn(timeLag)
.atLeastOnce();
@@ -1453,9 +1478,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING,
payload.getDetailedState());
Assert.assertEquals(0, payload.getRecentErrors().size());
- Assert.assertEquals(timeLag, payload.getMinimumLagMillis());
- Assert.assertEquals(20000000L, (long) payload.getAggregateLagMillis());
-
TaskReportData publishingReport = payload.getPublishingTasks().get(0);
Assert.assertEquals("id1", publishingReport.getId());
@@ -1525,7 +1547,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
+
EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(),
EasyMock.anyObject()))
.andReturn(timeLag)
.atLeastOnce();
@@ -1605,9 +1627,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertEquals(1, payload.getPublishingTasks().size());
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING,
payload.getDetailedState());
Assert.assertEquals(0, payload.getRecentErrors().size());
- Assert.assertEquals(timeLag, payload.getMinimumLagMillis());
- Assert.assertEquals(9000L + 1234L, (long) payload.getAggregateLagMillis());
-
TaskReportData publishingReport = payload.getPublishingTasks().get(0);
@@ -1681,7 +1700,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
+
EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(),
EasyMock.anyObject()))
.andReturn(timeLag)
.atLeastOnce();
Task id1 = createKinesisIndexTask(
@@ -1859,9 +1878,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -1944,9 +1960,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -2056,10 +2069,6 @@ public class KinesisSupervisorTest extends
EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
-
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
@@ -2207,9 +2216,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@@ -2618,9 +2624,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@@ -2847,9 +2850,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
@@ -3005,9 +3005,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -3261,9 +3258,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Task id1 = createKinesisIndexTask(
"id1",
@@ -3482,7 +3476,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Test
- public void testDoNotKillCompatibleTasks() throws InterruptedException,
EntryExistsException
+ public void testDoNotKillCompatibleTasks()
+ throws InterruptedException, EntryExistsException
{
// This supervisor always returns true for isTaskCurrent -> it should not
kill its tasks
int numReplicas = 2;
@@ -3512,9 +3507,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Task task = createKinesisIndexTask(
"id2",
@@ -3582,7 +3574,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Test
- public void testKillIncompatibleTasks() throws InterruptedException,
EntryExistsException
+ public void testKillIncompatibleTasks()
+ throws InterruptedException, EntryExistsException
{
// This supervisor always returns false for isTaskCurrent -> it should
kill its tasks
int numReplicas = 2;
@@ -3611,9 +3604,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Task task = createKinesisIndexTask(
"id1",
@@ -3856,9 +3846,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -3967,9 +3954,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
@@ -4145,9 +4129,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
@@ -4311,9 +4292,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -4433,9 +4411,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<Task> postMergeCaptured = Capture.newInstance(CaptureType.ALL);
@@ -4590,9 +4565,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
-
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
- .andReturn(TIME_LAG)
- .atLeastOnce();
Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 247ff0e..cc4b986 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -37,6 +37,8 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
@@ -108,6 +110,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -479,7 +482,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
private final Object stopLock = new Object();
private final Object stateChangeLock = new Object();
- private final Object recordSupplierLock = new Object();
+ private final ReentrantLock recordSupplierLock = new ReentrantLock();
private final boolean useExclusiveStartingSequence;
private boolean listenerRegistered = false;
@@ -706,6 +709,11 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
notices.add(new ResetNotice(dataSourceMetadata));
}
+ public ReentrantLock getRecordSupplierLock()
+ {
+ return recordSupplierLock;
+ }
+
@VisibleForTesting
public void tryInit()
@@ -1889,10 +1897,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
{
List<PartitionIdType> previousPartitionIds = new ArrayList<>(partitionIds);
Set<PartitionIdType> partitionIdsFromSupplier;
+ recordSupplierLock.lock();
try {
- synchronized (recordSupplierLock) {
- partitionIdsFromSupplier =
recordSupplier.getPartitionIds(ioConfig.getStream());
- }
+ partitionIdsFromSupplier =
recordSupplier.getPartitionIds(ioConfig.getStream());
}
catch (Exception e) {
stateManager.recordThrowableEvent(e);
@@ -1900,6 +1907,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
log.debug(e, "full stack trace");
return false;
}
+ finally {
+ recordSupplierLock.unlock();
+ }
if (partitionIdsFromSupplier == null || partitionIdsFromSupplier.size() ==
0) {
String errMsg = StringUtils.format("No partitions found for stream
[%s]", ioConfig.getStream());
@@ -1989,6 +1999,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
+ Int2ObjectMap<List<PartitionIdType>> newlyDiscovered = new
Int2ObjectLinkedOpenHashMap<>();
for (PartitionIdType partitionId : activePartitionsIdsFromSupplier) {
int taskGroupId = getTaskGroupIdForPartition(partitionId);
Set<PartitionIdType> partitionGroup = partitionGroups.computeIfAbsent(
@@ -1998,16 +2009,30 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
partitionGroup.add(partitionId);
if (partitionOffsets.putIfAbsent(partitionId, getNotSetMarker()) ==
null) {
- log.info(
+ log.debug(
"New partition [%s] discovered for stream [%s], added to task
group [%d]",
partitionId,
ioConfig.getStream(),
taskGroupId
);
+
+ newlyDiscovered.computeIfAbsent(taskGroupId,
ArrayList::new).add(partitionId);
+ }
+ }
+
+ if (newlyDiscovered.size() > 0) {
+ for (Int2ObjectMap.Entry<List<PartitionIdType>> taskGroupPartitions :
newlyDiscovered.int2ObjectEntrySet()) {
+ log.info(
+ "New partitions %s discovered for stream [%s], added to task group
[%s]",
+ taskGroupPartitions.getValue(),
+ ioConfig.getStream(),
+ taskGroupPartitions.getIntKey()
+ );
}
}
if (!partitionIds.equals(previousPartitionIds)) {
+ assignRecordSupplierToPartitionIds();
// the set of partition IDs has changed, have any running tasks stop
early so that we can adjust to the
// repartitioning quickly by creating new tasks
for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
@@ -2034,6 +2059,28 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return true;
}
+ private void assignRecordSupplierToPartitionIds()
+ {
+ recordSupplierLock.lock();
+ try {
+ final Set partitions = partitionIds.stream()
+ .map(partitionId -> new
StreamPartition<>(ioConfig.getStream(), partitionId))
+ .collect(Collectors.toSet());
+ if (!recordSupplier.getAssignment().containsAll(partitions)) {
+ recordSupplier.assign(partitions);
+ try {
+ recordSupplier.seekToEarliest(partitions);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ finally {
+ recordSupplierLock.unlock();
+ }
+ }
+
/**
* This method determines the set of expired partitions from the set of
partitions currently returned by
* the record supplier and the set of partitions previously tracked in the
metadata.
@@ -2106,6 +2153,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
partitionIds.clear();
partitionIds.addAll(activePartitionsIdsFromSupplier);
+ assignRecordSupplierToPartitionIds();
for (Integer groupId : partitionGroups.keySet()) {
if (newPartitionGroups.containsKey(groupId)) {
@@ -2773,8 +2821,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return startingOffsets;
}
- private void createNewTasks()
- throws JsonProcessingException
+ private void createNewTasks() throws JsonProcessingException
{
// update the checkpoints in the taskGroup to latest ones so that new
tasks do not read what is already published
verifyAndMergeCheckpoints(
@@ -2993,7 +3040,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (sequence == null) {
throw new ISE("unable to fetch sequence number for partition[%s] from
stream", partition);
}
- log.info("Getting sequence number [%s] for partition [%s]", sequence,
partition);
+ log.debug("Getting sequence number [%s] for partition [%s]", sequence,
partition);
return makeSequenceNumber(sequence, false);
}
}
@@ -3023,26 +3070,27 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return Collections.emptyMap();
}
+ /**
+ * Fetches the earliest or latest offset from the stream via the {@link
RecordSupplier}
+ */
@Nullable
private SequenceOffsetType getOffsetFromStreamForPartition(PartitionIdType
partition, boolean useEarliestOffset)
{
- synchronized (recordSupplierLock) {
+ recordSupplierLock.lock();
+ try {
StreamPartition<PartitionIdType> topicPartition = new
StreamPartition<>(ioConfig.getStream(), partition);
if (!recordSupplier.getAssignment().contains(topicPartition)) {
- final Set partitions = Collections.singleton(topicPartition);
- recordSupplier.assign(partitions);
- try {
- recordSupplier.seekToEarliest(partitions);
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ // this shouldn't happen, but in case it does...
+ throw new IllegalStateException("Record supplier does not match
current known partitions");
}
return useEarliestOffset
? recordSupplier.getEarliestSequenceNumber(topicPartition)
: recordSupplier.getLatestSequenceNumber(topicPartition);
}
+ finally {
+ recordSupplierLock.unlock();
+ }
}
private void createTasksForGroup(int groupId, int replicas)
@@ -3098,16 +3146,24 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ /**
+ * monitoring method, fetches current partition offsets and lag in a
background reporting thread
+ */
@VisibleForTesting
public void updateCurrentAndLatestOffsets()
{
- try {
- updateCurrentOffsets();
- updateLatestOffsetsFromStream();
- sequenceLastUpdated = DateTimes.nowUtc();
- }
- catch (Exception e) {
- log.warn(e, "Exception while getting current/latest sequences");
+ // if we aren't in a steady state, chill out for a bit, don't worry, we'll
get called later, but if we aren't
+ // healthy go ahead and try anyway to try if possible to provide insight
into how much time is left to fix the
+ // issue for cluster operators since this feeds the lag metrics
+ if (stateManager.isSteadyState() || !stateManager.isHealthy()) {
+ try {
+ updateCurrentOffsets();
+ updatePartitionLagFromStream();
+ sequenceLastUpdated = DateTimes.nowUtc();
+ }
+ catch (Exception e) {
+ log.warn(e, "Exception while getting current/latest sequences");
+ }
}
}
@@ -3136,34 +3192,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Futures.successfulAsList(futures).get(futureTimeoutInSeconds,
TimeUnit.SECONDS);
}
- private void updateLatestOffsetsFromStream() throws InterruptedException
- {
- synchronized (recordSupplierLock) {
- Set<PartitionIdType> partitionIds;
- try {
- partitionIds = recordSupplier.getPartitionIds(ioConfig.getStream());
- }
- catch (Exception e) {
- log.warn("Could not fetch partitions for topic/stream [%s]",
ioConfig.getStream());
- throw new StreamException(e);
- }
-
- Set<StreamPartition<PartitionIdType>> partitions = partitionIds
- .stream()
- .map(e -> new StreamPartition<>(ioConfig.getStream(), e))
- .collect(Collectors.toSet());
-
- recordSupplier.assign(partitions);
- recordSupplier.seekToLatest(partitions);
-
- updateLatestSequenceFromStream(recordSupplier, partitions);
- }
- }
-
- protected abstract void updateLatestSequenceFromStream(
- RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier,
- Set<StreamPartition<PartitionIdType>> partitions
- );
+ protected abstract void updatePartitionLagFromStream();
/**
* Gets 'lag' of currently processed offset behind latest offset as a
measure of difference between offsets.
@@ -3179,17 +3208,21 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()
{
- if (!spec.isSuspended() || activelyReadingTaskGroups.size() > 0 ||
pendingCompletionTaskGroups.size() > 0) {
- return activelyReadingTaskGroups
- .values()
- .stream()
- .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
- .flatMap(taskData ->
taskData.getValue().currentSequences.entrySet().stream())
- .collect(Collectors.toMap(
- Entry::getKey,
- Entry::getValue,
- (v1, v2) ->
makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
- ));
+ if (!spec.isSuspended()) {
+ if (activelyReadingTaskGroups.size() > 0 ||
pendingCompletionTaskGroups.size() > 0) {
+ return activelyReadingTaskGroups
+ .values()
+ .stream()
+ .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
+ .flatMap(taskData ->
taskData.getValue().currentSequences.entrySet().stream())
+ .collect(Collectors.toMap(
+ Entry::getKey,
+ Entry::getValue,
+ (v1, v2) ->
makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
+ ));
+ }
+ // nothing is running but we are not suspended, so lets just hang out in
case we get called while things start up
+ return ImmutableMap.of();
} else {
// if supervisor is suspended, no tasks are likely running so use
offsets in metadata, if exist
return getOffsetsFromMetadataStorage();
@@ -3447,8 +3480,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected void emitLag()
{
- if (spec.isSuspended()) {
- // don't emit metrics if supervisor is suspended (lag should still
available in status report)
+ if (spec.isSuspended() || !stateManager.isSteadyState()) {
+ // don't emit metrics if supervisor is suspended or not in a healthy
running state
+ // (lag should still available in status report)
return;
}
try {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 0d0ae06..54aa3be 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -753,7 +753,6 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
{
spec = createMock(SeekableStreamSupervisorSpec.class);
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
-
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(new
SeekableStreamSupervisorIOConfig(
"stream",
@@ -766,13 +765,20 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
false,
new Period("PT30M"),
null,
- null, null
+ null,
+ null
)
{
}).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
- EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new
DruidMonitorSchedulerConfig()).anyTimes();
+ EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new
DruidMonitorSchedulerConfig() {
+ @Override
+ public Duration getEmitterPeriod()
+ {
+ return new Period("PT1S").toStandardDuration();
+ }
+ }).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes();
EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
@@ -986,9 +992,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
}
@Override
- protected void updateLatestSequenceFromStream(
- RecordSupplier<String, String> recordSupplier,
Set<StreamPartition<String>> streamPartitions
- )
+ protected void updatePartitionLagFromStream()
{
// do nothing
}
@@ -1219,7 +1223,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
protected void emitLag()
{
super.emitLag();
- latch.countDown();
+ if (stateManager.isSteadyState()) {
+ latch.countDown();
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
index 76cf8c6..406d5d2 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
@@ -92,7 +92,7 @@ public class SupervisorStateManager
private final Deque<ExceptionEvent> recentEventsQueue = new
ConcurrentLinkedDeque<>();
- private State supervisorState = BasicState.PENDING;
+ private volatile State supervisorState = BasicState.PENDING;
private boolean atLeastOneSuccessfulRun = false;
private boolean currentRunSuccessful = true;
@@ -214,6 +214,11 @@ public class SupervisorStateManager
return supervisorState != null && supervisorState.isHealthy();
}
+ public boolean isSteadyState()
+ {
+ return healthySteadyState.equals(supervisorState);
+ }
+
public boolean isAtLeastOneSuccessfulRun()
{
return atLeastOneSuccessfulRun;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]