jsun98 commented on a change in pull request #6431: Add Kinesis Indexing 
Service to core Druid
URL: https://github.com/apache/incubator-druid/pull/6431#discussion_r238020083
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 ##########
 @@ -16,55 +16,1991 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.druid.indexing.seekablestream;
 
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.druid.data.input.Committer;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.LookupNodeService;
+import org.apache.druid.discovery.NodeType;
+import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
+import 
org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
+import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import org.apache.druid.indexing.common.stats.RowIngestionMeters;
+import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.RealtimeIndexTask;
+import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.collect.Utils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.indexing.RealtimeIOConfig;
+import org.apache.druid.segment.realtime.FireDepartment;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
+import 
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
+import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
+import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CircularBuffer;
+import org.joda.time.DateTime;
 
+import javax.annotation.Nullable;
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.servlet.http.HttpServletRequest;
+import javax.validation.constraints.NotNull;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 /**
  * Interface for abstracting the indexing task run logic. Only used by Kafka 
indexing tasks,
  * but will also be used by Kinesis indexing tasks once implemented
  *
  * @param <PartitionType> Partition Number Type
- * @param <SequenceType> Sequence Number Type
+ * @param <SequenceType>  Sequence Number Type
  */
-public interface SeekableStreamIndexTaskRunner<PartitionType, SequenceType> 
extends ChatHandler
+public abstract class SeekableStreamIndexTaskRunner<PartitionType, 
SequenceType> implements ChatHandler
 {
-  Appenderator getAppenderator();
+  public enum Status
+  {
+    NOT_STARTED,
+    STARTING,
+    READING,
+    PAUSED,
+    PUBLISHING
+  }
 
-  TaskStatus run(TaskToolbox toolbox);
+  private final EmittingLogger log = new EmittingLogger(this.getClass());
+  private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
+  private static final String METADATA_PUBLISH_PARTITIONS = 
"publishPartitions";
 
-  void stopGracefully();
+  private final Map<PartitionType, SequenceType> endOffsets;
+  private final Map<PartitionType, SequenceType> currOffsets = new 
ConcurrentHashMap<>();
+  private final Map<PartitionType, SequenceType> lastPersistedOffsets = new 
ConcurrentHashMap<>();
 
-  @VisibleForTesting
-  RowIngestionMeters getRowIngestionMeters();
+  // The pause lock and associated conditions are to support coordination 
between the Jetty threads and the main
+  // ingestion loop. The goal is to provide callers of the API a guarantee 
that if pause() returns successfully
+  // the ingestion loop has been stopped at the returned sequences and will 
not ingest any more data until resumed. The
+  // fields are used as follows (every step requires acquiring [pauseLock]):
+  //   Pausing:
+  //   - In pause(), [pauseRequested] is set to true and then execution waits 
for [status] to change to PAUSED, with the
+  //     condition checked when [hasPaused] is signalled.
+  //   - In possiblyPause() called from the main loop, if [pauseRequested] is 
true, [status] is set to PAUSED,
+  //     [hasPaused] is signalled, and execution pauses until [pauseRequested] 
becomes false, either by being set or by
+  //     the [pauseMillis] timeout elapsing. [pauseRequested] is checked when 
[shouldResume] is signalled.
+  //   Resuming:
+  //   - In resume(), [pauseRequested] is set to false, [shouldResume] is 
signalled, and execution waits for [status] to
+  //     change to something other than PAUSED, with the condition checked 
when [shouldResume] is signalled.
+  //   - In possiblyPause(), when [shouldResume] is signalled, if 
[pauseRequested] has become false the pause loop ends,
+  //     [status] is changed to STARTING and [shouldResume] is signalled.
+  private final Lock pauseLock = new ReentrantLock();
+  private final Condition hasPaused = pauseLock.newCondition();
+  private final Condition shouldResume = pauseLock.newCondition();
 
-  @VisibleForTesting
-  SeekableStreamIndexTask.Status getStatus();
+  protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
+  private final AtomicBoolean publishOnStop = new AtomicBoolean(false);
+
+  // [statusLock] is used to synchronize the Jetty thread calling 
stopGracefully() with the main run thread. It prevents
+  // the main run thread from switching into a publishing state while the 
stopGracefully() thread thinks it's still in
+  // a pre-publishing state. This is important because stopGracefully() will 
try to use the [stopRequested] flag to stop
+  // the main thread where possible, but this flag is not honored once 
publishing has begun so in this case we must
+  // interrupt the thread. The lock ensures that if the run thread is about to 
transition into publishing state, it
+  // blocks until after stopGracefully() has set [stopRequested] and then does 
a final check on [stopRequested] before
+  // transitioning to publishing state.
+  private final Object statusLock = new Object();
+
+  protected final Lock pollRetryLock = new ReentrantLock();
+  protected final Condition isAwaitingRetry = pollRetryLock.newCondition();
+
+  private final SeekableStreamIndexTask<PartitionType, SequenceType> task;
+  private final SeekableStreamIOConfig<PartitionType, SequenceType> ioConfig;
+  private final SeekableStreamTuningConfig tuningConfig;
+  private final InputRowParser<ByteBuffer> parser;
+  private final AuthorizerMapper authorizerMapper;
+  private final Optional<ChatHandlerProvider> chatHandlerProvider;
+  private final CircularBuffer<Throwable> savedParseExceptions;
+  private final String stream;
+  private final RowIngestionMeters rowIngestionMeters;
+
+  private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
+  private final List<ListenableFuture<SegmentsAndMetadata>> publishWaitList = 
new ArrayList<>();
+  private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = 
new ArrayList<>();
+  private final Map<PartitionType, SequenceType> initialOffsetsSnapshot = new 
HashMap<>();
+  private final Set<PartitionType> exclusiveStartingPartitions = new 
HashSet<>();
+
+  // true for kafka, falsse for kinesis
+  private final boolean isSkipSegmentLineageCheck;
+
+  private volatile DateTime startTime;
+  private volatile Status status = Status.NOT_STARTED; // this is only ever 
set by the task runner thread (runThread)
+  private volatile TaskToolbox toolbox;
+  private volatile Thread runThread;
+  private volatile Appenderator appenderator;
+  private volatile StreamAppenderatorDriver driver;
+  private volatile IngestionState ingestionState;
+
+  protected volatile boolean pauseRequested = false;
+  private volatile long nextCheckpointTime;
+
+  private volatile CopyOnWriteArrayList<SequenceMetadata> sequences;
+  private volatile Throwable backgroundThreadException;
+
+  public SeekableStreamIndexTaskRunner(
+      final SeekableStreamIndexTask<PartitionType, SequenceType> task,
+      final InputRowParser<ByteBuffer> parser,
+      final AuthorizerMapper authorizerMapper,
+      final Optional<ChatHandlerProvider> chatHandlerProvider,
+      final CircularBuffer<Throwable> savedParseExceptions,
+      final RowIngestionMetersFactory rowIngestionMetersFactory,
+      final boolean isSkipSegmentLineageCheck
+  )
+  {
+    this.task = task;
+    this.ioConfig = task.getIOConfig();
+    this.tuningConfig = task.getTuningConfig();
+    this.parser = parser;
+    this.authorizerMapper = authorizerMapper;
+    this.chatHandlerProvider = chatHandlerProvider;
+    this.savedParseExceptions = savedParseExceptions;
+    this.stream = ioConfig.getStartPartitions().getStream();
+    this.rowIngestionMeters = 
rowIngestionMetersFactory.createRowIngestionMeters();
+    this.isSkipSegmentLineageCheck = isSkipSegmentLineageCheck;
+
+    this.endOffsets = new 
ConcurrentHashMap<>(ioConfig.getEndPartitions().getPartitionSequenceNumberMap());
+    this.sequences = new CopyOnWriteArrayList<>();
+    this.ingestionState = IngestionState.NOT_STARTED;
+
+    resetNextCheckpointTime();
+  }
+
+
+  public TaskStatus run(TaskToolbox toolbox)
+  {
+    try {
+      return runInternal(toolbox);
+    }
+    catch (Exception e) {
+      log.error(e, "Encountered exception while running task.");
+      final String errorMsg = Throwables.getStackTraceAsString(e);
+      
toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(errorMsg));
+      return TaskStatus.failure(
+          task.getId(),
+          errorMsg
+      );
+    }
+  }
+
+  private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
+  {
+    log.info("SeekableStream indexing task starting up!");
+    startTime = DateTimes.nowUtc();
+    status = Status.STARTING;
+    this.toolbox = toolbox;
+
+
+    if (!restoreSequences()) {
+      final TreeMap<Integer, Map<PartitionType, SequenceType>> checkpoints = 
getCheckPointsFromContext(toolbox, task);
+      if (checkpoints != null) {
+        boolean exclusive = false;
+        Iterator<Map.Entry<Integer, Map<PartitionType, SequenceType>>> 
sequenceOffsets = checkpoints.entrySet()
+                                                                               
                     .iterator();
+        Map.Entry<Integer, Map<PartitionType, SequenceType>> previous = 
sequenceOffsets.next();
+        while (sequenceOffsets.hasNext()) {
+          Map.Entry<Integer, Map<PartitionType, SequenceType>> current = 
sequenceOffsets.next();
+          sequences.add(createSequenceMetaData(
+              previous.getKey(),
+              StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 
previous.getKey()),
+              previous.getValue(),
+              current.getValue(),
+              true,
+              exclusive ? previous.getValue().keySet() : null
+          ));
+          previous = current;
+          exclusive = true;
+        }
+        sequences.add(createSequenceMetaData(
+            previous.getKey(),
+            StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 
previous.getKey()),
+            previous.getValue(),
+            endOffsets,
+            false,
+            exclusive ? previous.getValue().keySet() : null
+        ));
+      } else {
+        sequences.add(createSequenceMetaData(
+            0,
+            StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
+            ioConfig.getStartPartitions().getPartitionSequenceNumberMap(),
+            endOffsets,
+            false,
+            null
+        ));
+      }
+    }
+
+    log.info("Starting with sequences:  %s", sequences);
+
+    if (chatHandlerProvider.isPresent()) {
+      log.info("Found chat handler of class[%s]", 
chatHandlerProvider.get().getClass().getName());
+      chatHandlerProvider.get().register(task.getId(), this, false);
+    } else {
+      log.warn("No chat handler detected");
+    }
+
+    runThread = Thread.currentThread();
+
+    // Set up FireDepartmentMetrics
+    final FireDepartment fireDepartmentForMetrics = new FireDepartment(
+        task.getDataSchema(),
+        new RealtimeIOConfig(null, null, null),
+        null
+    );
+    FireDepartmentMetrics fireDepartmentMetrics = 
fireDepartmentForMetrics.getMetrics();
+    toolbox.getMonitorScheduler()
+           .addMonitor(TaskRealtimeMetricsMonitorBuilder.build(task, 
fireDepartmentForMetrics, rowIngestionMeters));
+
+    final String lookupTier = 
task.getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER);
+    final LookupNodeService lookupNodeService = lookupTier == null ?
+                                                toolbox.getLookupNodeService() 
:
+                                                new 
LookupNodeService(lookupTier);
+
+    final DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
+        toolbox.getDruidNode(),
+        NodeType.PEON,
+        ImmutableMap.of(
+            toolbox.getDataNodeService().getName(), 
toolbox.getDataNodeService(),
+            lookupNodeService.getName(), lookupNodeService
+        )
+    );
+
+    Throwable caughtExceptionOuter = null;
+    try (final RecordSupplier<PartitionType, SequenceType> recordSupplier = 
task.newRecordSupplier()) {
+      toolbox.getDataSegmentServerAnnouncer().announce();
+      toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
+
+      appenderator = task.newAppenderator(fireDepartmentMetrics, toolbox);
+      driver = task.newDriver(appenderator, toolbox, fireDepartmentMetrics);
+
+      final String stream = ioConfig.getStartPartitions().getStream();
+
+      // Start up, set up initial sequences.
+      final Object restoredMetadata = driver.startJob();
+      if (restoredMetadata == null) {
+        // no persist has happened so far
+        // so either this is a brand new task or replacement of a failed task
+        
Preconditions.checkState(sequences.get(0).startOffsets.entrySet().stream().allMatch(
+            partitionOffsetEntry ->
+                
createSequenceNumber(partitionOffsetEntry.getValue()).compareTo(
+                    createSequenceNumber(ioConfig.getStartPartitions()
+                                                 
.getPartitionSequenceNumberMap()
+                                                 
.get(partitionOffsetEntry.getKey())
+                    )) >= 0
+        ), "Sequence sequences are not compatible with start sequences of 
task");
+        currOffsets.putAll(sequences.get(0).startOffsets);
+      } else {
+        @SuppressWarnings("unchecked")
+        final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
+        final SeekableStreamPartitions<PartitionType, SequenceType> 
restoredNextPartitions = createSeekableStreamPartitions(
+            toolbox.getObjectMapper(),
+            restoredMetadataMap.get(METADATA_NEXT_PARTITIONS)
+        );
+
+        
currOffsets.putAll(restoredNextPartitions.getPartitionSequenceNumberMap());
+
+        // Sanity checks.
+        if 
(!restoredNextPartitions.getStream().equals(ioConfig.getStartPartitions().getStream()))
 {
+          throw new ISE(
+              "WTF?! Restored stream[%s] but expected stream[%s]",
+              restoredNextPartitions.getStream(),
+              ioConfig.getStartPartitions().getStream()
+          );
+        }
+
+        if 
(!currOffsets.keySet().equals(ioConfig.getStartPartitions().getPartitionSequenceNumberMap().keySet()))
 {
+          throw new ISE(
+              "WTF?! Restored partitions[%s] but expected partitions[%s]",
+              currOffsets.keySet(),
+              
ioConfig.getStartPartitions().getPartitionSequenceNumberMap().keySet()
+          );
+        }
+        // sequences size can be 0 only when all sequences got published and 
task stopped before it could finish
+        // which is super rare
+        if (sequences.size() == 0 || sequences.get(sequences.size() - 
1).isCheckpointed()) {
+          this.endOffsets.putAll(sequences.size() == 0
+                                 ? currOffsets
+                                 : sequences.get(sequences.size() - 
1).getEndOffsets());
+          log.info("End sequences changed to [%s]", endOffsets);
+        }
+      }
+
+      // Filter out partitions with END_OF_SHARD markers since these 
partitions have already been fully read. This
+      // should have been done by the supervisor already so this is defensive.
+      int numPreFilterPartitions = currOffsets.size();
+      if (currOffsets.entrySet().removeIf(x -> 
OrderedPartitionableRecord.END_OF_SHARD_MARKER.equals(x.getValue()))) {
+        log.info(
+            "Removed [%d] partitions from assignment which have already been 
closed",
+            numPreFilterPartitions - currOffsets.size()
+        );
+      }
+
+      // Set up committer.
+      final Supplier<Committer> committerSupplier = () -> {
+        final Map<PartitionType, SequenceType> snapshot = 
ImmutableMap.copyOf(currOffsets);
+        lastPersistedOffsets.clear();
+        lastPersistedOffsets.putAll(snapshot);
+
+        return new Committer()
+        {
+          @Override
+          public Object getMetadata()
+          {
+            return ImmutableMap.of(
+                METADATA_NEXT_PARTITIONS, new SeekableStreamPartitions<>(
+                    ioConfig.getStartPartitions().getStream(),
+                    snapshot
+                )
+            );
+          }
+
+          @Override
+          public void run()
+          {
+            // Do nothing.
+          }
+        };
+      };
+
+      // restart publishing of sequences (if any)
+      maybePersistAndPublishSequences(committerSupplier);
+
+      Set<StreamPartition<PartitionType>> assignment = 
assignPartitions(recordSupplier);
+      possiblyResetDataSourceMetadata(recordSupplier, assignment);
+      seekToStartingSequence(recordSupplier, assignment);
+
+      ingestionState = IngestionState.BUILD_SEGMENTS;
+
+      // Main loop.
+      // Could eventually support leader/follower mode (for keeping replicas 
more in sync)
+      boolean stillReading = !assignment.isEmpty();
+      status = Status.READING;
+      Throwable caughtExceptionInner = null;
+
+      initialOffsetsSnapshot.putAll(currOffsets);
+      
exclusiveStartingPartitions.addAll(ioConfig.getExclusiveStartSequenceNumberPartitions());
+
+      try {
+        while (stillReading) {
+          if (possiblyPause()) {
+            // The partition assignments may have changed while paused by a 
call to setEndOffsets() so reassign
+            // partitions upon resuming. This is safe even if the end 
sequences have not been modified.
+            assignment = assignPartitions(recordSupplier);
+            possiblyResetDataSourceMetadata(recordSupplier, assignment);
+            seekToStartingSequence(recordSupplier, assignment);
+
+            if (assignment.isEmpty()) {
+              log.info("All partitions have been fully read");
+              publishOnStop.set(true);
+              stopRequested.set(true);
+            }
+          }
+
+          // if stop is requested or task's end sequence is set by call to 
setEndOffsets method with finish set to true
+          if (stopRequested.get() || sequences.get(sequences.size() - 
1).isCheckpointed()) {
+            status = Status.PUBLISHING;
+          }
+
+          if (stopRequested.get()) {
+            break;
+          }
+
+          if (backgroundThreadException != null) {
+            throw new RuntimeException(backgroundThreadException);
+          }
+
+          checkPublishAndHandoffFailure();
+
+          maybePersistAndPublishSequences(committerSupplier);
+
+
+          // calling getReocrd() ensures that excpetions specific to 
kafka/kinesis like OffsetOutOfRangeException
+          // are handled in the subclasses
+          List<OrderedPartitionableRecord<PartitionType, SequenceType>> 
records = getRecords(recordSupplier, toolbox);
+
+          stillReading = !assignment.isEmpty();
+
+          SequenceMetadata sequenceToCheckpoint = null;
+          for (OrderedPartitionableRecord<PartitionType, SequenceType> record 
: records) {
+
+            // for Kafka, the end offsets are exclusive, so skip it
+            if (getRunnerType() == Type.KAFKA && 
createSequenceNumber(record.getSequenceNumber()).equals(
 
 Review comment:
   the behavior for the original kafka implementation is to read from 
startingOffsets (inclusive) to endOffsets (exclusive), whereas the for kinesis 
the behavior is to read from start (inclusive) to end (inclusive) due to 
limitations in getting the next sequence number. This check is added here to 
ensure similar behavior with the old kafka task implementation. If you look at 
`KafkaSequenceMetadata`, the behavior of canHandle() is copied from the old 
kafka task implementation.
   ```java
       protected boolean canHandle(OrderedPartitionableRecord<Integer, Long> 
record)
       {
         lock.lock();
         try {
           final OrderedSequenceNumber<Long> partitionEndOffset = 
createSequenceNumber(endOffsets.get(record.getPartitionId()));
           final OrderedSequenceNumber<Long> partitionStartOffset = 
createSequenceNumber(startOffsets.get(record.getPartitionId()));
           final OrderedSequenceNumber<Long> recordOffset = 
createSequenceNumber(record.getSequenceNumber());
           return isOpen()
                  && recordOffset != null
                  && partitionEndOffset != null
                  && partitionStartOffset != null
                  && recordOffset.compareTo(partitionStartOffset) >= 0
                  && recordOffset.compareTo(partitionEndOffset) < 0;
         }
         finally {
           lock.unlock();
         }
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to