jsun98 commented on a change in pull request #6431: Add Kinesis Indexing 
Service to core Druid
URL: https://github.com/apache/incubator-druid/pull/6431#discussion_r241906420
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 ##########
 @@ -0,0 +1,2800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.IndexTaskClient;
+import org.apache.druid.indexing.common.TaskInfoProvider;
+import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskQueue;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIOConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
+import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
+import org.apache.druid.indexing.seekablestream.SeekableStreamTuningConfig;
+import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.EntryExistsException;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * this class is the parent class of both the Kafka and Kinesis supervisor. 
All the main run loop
+ * logic are similar enough so they're grouped together into this class.
+ * <p>
+ * incremental handoff & checkpointing are not yet supported by Kinesis, but 
the logic is left in here
+ * so in the future it's easier to implement
+ * <p>
+ * Supervisor responsible for managing the SeekableStreamIndexTasks 
(Kafka/Kinesis) for a single dataSource. At a high level, the class accepts a
+ * {@link SeekableStreamSupervisorSpec} which includes the stream name (topic 
/ stream) and configuration as well as an ingestion spec which will
+ * be used to generate the indexing tasks. The run loop periodically refreshes 
its view of the stream's partitions
+ * and the list of running indexing tasks and ensures that all partitions are 
being read from and that there are enough
+ * tasks to satisfy the desired number of replicas. As tasks complete, new 
tasks are queued to process the next range of
+ * stream sequences.
+ * <p>
+ *
+ * @param <PartitionType> partition id type
+ * @param <SequenceType>  sequence number type
+ */
+public abstract class SeekableStreamSupervisor<PartitionType, SequenceType>
+    implements Supervisor
+{
+  public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = 
"IS_INCREMENTAL_HANDOFF_SUPPORTED";
+  private static final Random RANDOM = ThreadLocalRandom.current();
+  private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
+  private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
+  private static final int MAX_INITIALIZATION_RETRIES = 20;
+
+  private final EmittingLogger log = new EmittingLogger(this.getClass());
+
+  // Internal data structures
+  // --------------------------------------------------------
+
+  /**
+   * A TaskGroup is the main data structure used by SeekableStreamSupervisor 
to organize and monitor stream partitions and
+   * indexing tasks. All the tasks in a TaskGroup should always be doing the 
same thing (reading the same partitions and
+   * starting from the same sequences) and if [replicas] is configured to be 
1, a TaskGroup will contain a single task (the
+   * exception being if the supervisor started up and discovered and adopted 
some already running tasks). At any given
+   * time, there should only be up to a maximum of [taskCount] 
actively-reading task groups (tracked in the [activelyReadingTaskGroups]
+   * map) + zero or more pending-completion task groups (tracked in 
[pendingCompletionTaskGroups]).
+   */
+  private class TaskGroup
+  {
+    final int groupId;
+
+    // This specifies the partitions and starting sequences for this task 
group. It is set on group creation from the data
+    // in [partitionGroups] and never changes during the lifetime of this task 
group, which will live until a task in
+    // this task group has completed successfully, at which point this will be 
destroyed and a new task group will be
+    // created with new starting sequences. This allows us to create 
replacement tasks for failed tasks that process the
+    // same sequences, even if the values in [partitionGroups] has been 
changed.
+    final ImmutableMap<PartitionType, SequenceType> startingSequences;
+
+    final ConcurrentHashMap<String, TaskData> tasks = new 
ConcurrentHashMap<>();
+    final Optional<DateTime> minimumMessageTime;
+    final Optional<DateTime> maximumMessageTime;
+    final Set<PartitionType> exclusiveStartSequenceNumberPartitions;
+    final TreeMap<Integer, Map<PartitionType, SequenceType>> 
checkpointSequences = new TreeMap<>();
+    final String baseSequenceName;
+    DateTime completionTimeout; // is set after signalTasksToFinish(); if not 
done by timeout, take corrective action
+
+    TaskGroup(
+        int groupId,
+        ImmutableMap<PartitionType, SequenceType> startingSequences,
+        Optional<DateTime> minimumMessageTime,
+        Optional<DateTime> maximumMessageTime,
+        Set<PartitionType> exclusiveStartSequenceNumberPartitions
+    )
+    {
+      this.groupId = groupId;
+      this.startingSequences = startingSequences;
+      this.minimumMessageTime = minimumMessageTime;
+      this.maximumMessageTime = maximumMessageTime;
+      this.checkpointSequences.put(0, startingSequences);
+      this.exclusiveStartSequenceNumberPartitions = 
exclusiveStartSequenceNumberPartitions != null
+                                                    ? 
exclusiveStartSequenceNumberPartitions
+                                                    : new HashSet<>();
+      this.baseSequenceName = generateSequenceName(startingSequences, 
minimumMessageTime, maximumMessageTime);
+    }
+
+    int addNewCheckpoint(Map<PartitionType, SequenceType> checkpoint)
+    {
+      checkpointSequences.put(checkpointSequences.lastKey() + 1, checkpoint);
+      return checkpointSequences.lastKey();
+    }
+
+    Set<String> taskIds()
+    {
+      return tasks.keySet();
+    }
+
+  }
+
+  private class TaskData
+  {
+    volatile TaskStatus status;
+    volatile DateTime startTime;
+    volatile Map<PartitionType, SequenceType> currentSequences = new 
HashMap<>();
+
+    @Override
+    public String toString()
+    {
+      return "TaskData{" +
+             "status=" + status +
+             ", startTime=" + startTime +
+             ", checkpointSequences=" + currentSequences +
+             '}';
+    }
+  }
+
+  /**
+   * Notice is used to queue tasks that are internal to the supervisor
+   */
+  private interface Notice
+  {
+    void handle() throws ExecutionException, InterruptedException, 
TimeoutException, JsonProcessingException,
+                         NoSuchMethodException, IllegalAccessException, 
ClassNotFoundException;
+  }
+
+  private static class StatsFromTaskResult
+  {
+    private final String groupId;
+    private final String taskId;
+    private final Map<String, Object> stats;
+
+    public StatsFromTaskResult(
+        int groupId,
+        String taskId,
+        Map<String, Object> stats
+    )
+    {
+      this.groupId = String.valueOf(groupId);
+      this.taskId = taskId;
+      this.stats = stats;
+    }
+
+    public String getGroupId()
+    {
+      return groupId;
+    }
+
+    public String getTaskId()
+    {
+      return taskId;
+    }
+
+    public Map<String, Object> getStats()
+    {
+      return stats;
+    }
+  }
+
+
+  private class RunNotice implements Notice
+  {
+    @Override
+    public void handle() throws ExecutionException, InterruptedException, 
TimeoutException, JsonProcessingException,
+                                NoSuchMethodException, IllegalAccessException, 
ClassNotFoundException
+    {
+      long nowTime = System.currentTimeMillis();
+      if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) {
+        return;
+      }
+      lastRunTime = nowTime;
+
+      runInternal();
+    }
+  }
+
+  private class GracefulShutdownNotice extends ShutdownNotice
+  {
+    @Override
+    public void handle() throws InterruptedException, ExecutionException, 
TimeoutException
+    {
+      gracefulShutdownInternal();
+      super.handle();
+    }
+  }
+
+  private class ShutdownNotice implements Notice
+  {
+    @Override
+    public void handle() throws InterruptedException, ExecutionException, 
TimeoutException
+    {
+      recordSupplier.close();
+
+      synchronized (stopLock) {
+        stopped = true;
+        stopLock.notifyAll();
+      }
+    }
+  }
+
+  private class ResetNotice implements Notice
+  {
+    final DataSourceMetadata dataSourceMetadata;
+
+    ResetNotice(DataSourceMetadata dataSourceMetadata)
+    {
+      this.dataSourceMetadata = dataSourceMetadata;
+    }
+
+    @Override
+    public void handle()
+    {
+      resetInternal(dataSourceMetadata);
+    }
+  }
+
+  protected class CheckpointNotice implements Notice
+  {
+    @Nullable
+    private final Integer nullableTaskGroupId;
+    @Deprecated
+    private final String baseSequenceName;
+    private final SeekableStreamDataSourceMetadata<PartitionType, 
SequenceType> previousCheckpoint;
+    private final SeekableStreamDataSourceMetadata<PartitionType, 
SequenceType> currentCheckpoint;
+
+    public CheckpointNotice(
+        @Nullable Integer nullableTaskGroupId,
+        @Deprecated String baseSequenceName,
+        SeekableStreamDataSourceMetadata<PartitionType, SequenceType> 
previousCheckpoint,
+        SeekableStreamDataSourceMetadata<PartitionType, SequenceType> 
currentCheckpoint
+    )
+    {
+      this.baseSequenceName = baseSequenceName;
+      this.nullableTaskGroupId = nullableTaskGroupId;
+      this.previousCheckpoint = previousCheckpoint;
+      this.currentCheckpoint = currentCheckpoint;
+    }
+
+    @Override
+    public void handle() throws ExecutionException, InterruptedException
+    {
+      // Find taskGroupId using taskId if it's null. It can be null while 
rolling update.
+      final int taskGroupId;
+      if (nullableTaskGroupId == null) {
+        // We search taskId in activelyReadingTaskGroups and 
pendingCompletionTaskGroups sequentially. This should be fine because
+        // 1) a taskGroup can be moved from activelyReadingTaskGroups to 
pendingCompletionTaskGroups in RunNotice
+        //    (see checkTaskDuration()).
+        // 2) Notices are proceesed by a single thread. So, CheckpointNotice 
and RunNotice cannot be processed at the
+        //    same time.
+        final java.util.Optional<Integer> maybeGroupId = 
activelyReadingTaskGroups
+            .entrySet()
+            .stream()
+            .filter(entry -> {
+              final TaskGroup taskGroup = entry.getValue();
+              return taskGroup.baseSequenceName.equals(baseSequenceName);
+            })
+            .findAny()
+            .map(Entry::getKey);
+
+        taskGroupId = maybeGroupId.orElseGet(() -> pendingCompletionTaskGroups
+            .entrySet()
+            .stream()
+            .filter(entry -> {
+              final List<TaskGroup> taskGroups = entry.getValue();
+              return taskGroups.stream().anyMatch(group -> 
group.baseSequenceName.equals(baseSequenceName));
+            })
+            .findAny()
+            .orElseThrow(() -> new ISE("Cannot find taskGroup for 
baseSequenceName[%s]", baseSequenceName))
+            .getKey());
+
+      } else {
+        taskGroupId = nullableTaskGroupId;
+      }
+
+      // check for consistency
+      // if already received request for this sequenceName and 
dataSourceMetadata combination then return
+      final TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId);
+
+      if (isValidTaskGroup(taskGroupId, taskGroup)) {
+        final TreeMap<Integer, Map<PartitionType, SequenceType>> checkpoints = 
taskGroup.checkpointSequences;
+
+        // check validity of previousCheckpoint
+        int index = checkpoints.size();
+        for (int sequenceId : checkpoints.descendingKeySet()) {
+          Map<PartitionType, SequenceType> checkpoint = 
checkpoints.get(sequenceId);
+          // We have already verified the stream of the current checkpoint is 
same with that in ioConfig.
+          // See checkpoint().
+          if 
(checkpoint.equals(previousCheckpoint.getSeekableStreamPartitions()
+                                                  
.getPartitionSequenceNumberMap()
+          )) {
+            break;
+          }
+          index--;
+        }
+        if (index == 0) {
+          throw new ISE("No such previous checkpoint [%s] found", 
previousCheckpoint);
+        } else if (index < checkpoints.size()) {
+          // if the found checkpoint is not the latest one then already 
checkpointed by a replica
+          Preconditions.checkState(index == checkpoints.size() - 1, 
"checkpoint consistency failure");
+          log.info("Already checkpointed with sequences [%s]", 
checkpoints.lastEntry().getValue());
+          return;
+        }
+        final Map<PartitionType, SequenceType> newCheckpoint = 
checkpointTaskGroup(taskGroup, false).get();
+        taskGroup.addNewCheckpoint(newCheckpoint);
+        log.info("Handled checkpoint notice, new checkpoint is [%s] for 
taskGroup [%s]", newCheckpoint, taskGroupId);
+      }
+    }
+
+    boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup)
+    {
+      if (taskGroup == null) {
+        // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
+        if (pendingCompletionTaskGroups.containsKey(taskGroupId)) {
+          log.warn(
+              "Ignoring checkpoint request because taskGroup[%d] has already 
stopped indexing and is waiting for "
+              + "publishing segments",
+              taskGroupId
+          );
+          return false;
+        } else if (partitionGroups.containsKey(taskGroupId)) {
+          log.warn("Ignoring checkpoint request because taskGroup[%d] is 
inactive", taskGroupId);
+          return false;
+        } else {
+          throw new ISE("WTH?! cannot find taskGroup [%s] among all 
activelyReadingTaskGroups [%s]", taskGroupId,
+                        activelyReadingTaskGroups
+          );
+        }
+      }
+
+      return true;
+    }
+  }
+
+
+  // Map<{group ID}, {actively reading task group}>; see documentation for 
TaskGroup class
+  private final ConcurrentHashMap<Integer, TaskGroup> 
activelyReadingTaskGroups = new ConcurrentHashMap<>();
+
+  // After telling a taskGroup to stop reading and begin publishing a segment, 
it is moved from [activelyReadingTaskGroups] to here so
+  // we can monitor its status while we queue new tasks to read the next range 
of sequences. This is a list since we could
+  // have multiple sets of tasks publishing at once if time-to-publish > 
taskDuration.
+  // Map<{group ID}, List<{pending completion task groups}>>
+  private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> 
pendingCompletionTaskGroups = new ConcurrentHashMap<>();
+
+  // The starting sequence for a new partition in [partitionGroups] is 
initially set to getNotSetMarker(). When a new task group
+  // is created and is assigned partitions, if the sequence in 
[partitionGroups] is getNotSetMarker() it will take the starting
+  // sequence value from the metadata store, and if it can't find it there, 
from stream. Once a task begins
+  // publishing, the sequence in partitionGroups will be updated to the ending 
sequence of the publishing-but-not-yet-
+  // completed task, which will cause the next set of tasks to begin reading 
from where the previous task left
+  // off. If that previous task now fails, we will set the sequence in 
[partitionGroups] back to getNotSetMarker() which will
+  // cause successive tasks to again grab their starting sequence from 
metadata store. This mechanism allows us to
+  // start up successive tasks without waiting for the previous tasks to 
succeed and still be able to handle task
+  // failures during publishing.
+  // Map<{group ID}, Map<{partition ID}, {startingOffset}>>
+  private final ConcurrentHashMap<Integer, ConcurrentHashMap<PartitionType, 
SequenceType>> partitionGroups = new ConcurrentHashMap<>();
+
+  protected final ObjectMapper sortingMapper;
+  protected final List<PartitionType> partitionIds = new 
CopyOnWriteArrayList<>();
+  protected volatile DateTime sequenceLastUpdated;
+
+
+  private final Set<PartitionType> subsequentlyDiscoveredPartitions = new 
HashSet<>();
+  private final TaskStorage taskStorage;
+  private final TaskMaster taskMaster;
+  private final IndexerMetadataStorageCoordinator 
indexerMetadataStorageCoordinator;
+  private final SeekableStreamIndexTaskClient<PartitionType, SequenceType> 
taskClient;
+  private final SeekableStreamSupervisorSpec spec;
+  private final String dataSource;
+  private final SeekableStreamSupervisorIOConfig ioConfig;
+  private final SeekableStreamSupervisorTuningConfig tuningConfig;
+  private final SeekableStreamTuningConfig taskTuningConfig;
+  private final String supervisorId;
+  private final TaskInfoProvider taskInfoProvider;
+  private final long futureTimeoutInSeconds; // how long to wait for async 
operations to complete
+  private final RowIngestionMetersFactory rowIngestionMetersFactory;
+  private final ExecutorService exec;
+  private final ScheduledExecutorService scheduledExec;
+  private final ScheduledExecutorService reportingExec;
+  private final ListeningExecutorService workerExec;
+  private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
+  private final Object stopLock = new Object();
+  private final Object stateChangeLock = new Object();
+  private final Object recordSupplierLock = new Object();
+
+  private final boolean useExclusiveStartingSequence;
+  private boolean listenerRegistered = false;
+  private long lastRunTime;
+  private int initRetryCounter = 0;
+  private volatile DateTime firstRunTime;
+  private volatile RecordSupplier<PartitionType, SequenceType> recordSupplier;
+  private volatile boolean started = false;
+  private volatile boolean stopped = false;
+  private volatile boolean lifecycleStarted = false;
+
+
+  public SeekableStreamSupervisor(
+      final String supervisorId,
+      final TaskStorage taskStorage,
+      final TaskMaster taskMaster,
+      final IndexerMetadataStorageCoordinator 
indexerMetadataStorageCoordinator,
+      final SeekableStreamIndexTaskClientFactory<? extends 
SeekableStreamIndexTaskClient<PartitionType, SequenceType>> taskClientFactory,
+      final ObjectMapper mapper,
+      final SeekableStreamSupervisorSpec spec,
+      final RowIngestionMetersFactory rowIngestionMetersFactory,
+      final boolean useExclusiveStartingSequence
+  )
+  {
+    this.taskStorage = taskStorage;
+    this.taskMaster = taskMaster;
+    this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
+    this.sortingMapper = 
mapper.copy().configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
+    this.spec = spec;
+    this.rowIngestionMetersFactory = rowIngestionMetersFactory;
+    this.useExclusiveStartingSequence = useExclusiveStartingSequence;
+
+    this.dataSource = spec.getDataSchema().getDataSource();
+    this.ioConfig = spec.getIoConfig();
+    this.tuningConfig = spec.getTuningConfig();
+    this.taskTuningConfig = this.tuningConfig.copyOf();
 
 Review comment:
   I've refactored this and added a `convertToTaskTuningConfig()` method to 
`SeekableStreamSupervisorTuningConfig`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to