AmatyaAvadhanula commented on code in PR #14407:
URL: https://github.com/apache/druid/pull/14407#discussion_r1285391039


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.CriticalAction;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
+import org.apache.druid.indexing.overlord.TaskLockInfo;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Append segments to metadata storage. The segment versions must all be less 
than or equal to a lock held by
+ * your task for the segment intervals.
+ */
+public class SegmentTransactionalAppendAction implements 
TaskAction<SegmentPublishResult>
+{
+  private final Set<DataSegment> segments;
+
+  @Nullable
+  private final DataSourceMetadata startMetadata;
+  @Nullable
+  private final DataSourceMetadata endMetadata;
+  @Nullable
+  private final String dataSource;
+
+  public static SegmentTransactionalAppendAction appendAction(
+      Set<DataSegment> segments,
+      @Nullable DataSourceMetadata startMetadata,
+      @Nullable DataSourceMetadata endMetadata
+  )
+  {
+    return new SegmentTransactionalAppendAction(segments, startMetadata, 
endMetadata, null);
+  }
+
+  @JsonCreator
+  private SegmentTransactionalAppendAction(
+      @JsonProperty("segments") @Nullable Set<DataSegment> segments,
+      @JsonProperty("startMetadata") @Nullable DataSourceMetadata 
startMetadata,
+      @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
+      @JsonProperty("dataSource") @Nullable String dataSource
+  )
+  {
+    this.segments = segments == null ? ImmutableSet.of() : 
ImmutableSet.copyOf(segments);
+    this.startMetadata = startMetadata;
+    this.endMetadata = endMetadata;
+    this.dataSource = dataSource;
+  }
+
+  @JsonProperty
+  public Set<DataSegment> getSegments()
+  {
+    return segments;
+  }
+
+  @JsonProperty
+  @Nullable
+  public DataSourceMetadata getStartMetadata()
+  {
+    return startMetadata;
+  }
+
+  @JsonProperty
+  @Nullable
+  public DataSourceMetadata getEndMetadata()
+  {
+    return endMetadata;
+  }
+
+  @JsonProperty
+  @Nullable
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @Override
+  public TypeReference<SegmentPublishResult> getReturnTypeReference()
+  {
+    return new TypeReference<SegmentPublishResult>()
+    {
+    };
+  }
+
+  /**
+   * Performs some sanity checks and publishes the given segments.
+   */
+  @Override
+  public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
+  {
+    final SegmentPublishResult retVal;
+
+    if (segments.isEmpty()) {
+      // A stream ingestion task didn't ingest any rows and created no 
segments (e.g., all records were unparseable),
+      // but still needs to update metadata with the progress that the task 
made.
+      try {
+        retVal = 
toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly(
+            dataSource,
+            startMetadata,
+            endMetadata
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      return retVal;
+    }
+
+    final Set<DataSegment> allSegments = new HashSet<>(segments);
+
+    String datasource = task.getDataSource();
+    Map<Interval, TaskLock> replaceLocks = new HashMap<>();
+    for (TaskLock lock : TaskLocks.findReplaceLocksForSegments(datasource, 
toolbox.getTaskLockbox(), segments)) {
+      replaceLocks.put(lock.getInterval(), lock);
+    }
+    Map<DataSegment, TaskLockInfo> appendSegmentLockMap = new HashMap<>();
+    Set<TaskLockInfo> taskLockInfos = new HashSet<>();
+    for (TaskLock taskLock : replaceLocks.values()) {
+      taskLockInfos.add(getTaskLockInfo(taskLock));
+    }
+
+    for (DataSegment segment : segments) {
+      Interval interval = segment.getInterval();
+      for (Interval key : replaceLocks.keySet()) {
+        if (key.contains(interval)) {
+          appendSegmentLockMap.put(segment, 
getTaskLockInfo(replaceLocks.get(key)));
+        }
+      }
+    }
+
+    try {
+      retVal = toolbox.getTaskLockbox().doInCriticalSection(
+          task,
+          
allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+          CriticalAction.<SegmentPublishResult>builder()
+              .onValidLocks(
+                  () -> 
toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(

Review Comment:
   Done



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.CriticalAction;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
+import org.apache.druid.indexing.overlord.TaskLockInfo;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Append segments to metadata storage. The segment versions must all be less 
than or equal to a lock held by
+ * your task for the segment intervals.
+ */
+public class SegmentTransactionalAppendAction implements 
TaskAction<SegmentPublishResult>
+{
+  private final Set<DataSegment> segments;
+
+  @Nullable
+  private final DataSourceMetadata startMetadata;
+  @Nullable
+  private final DataSourceMetadata endMetadata;
+  @Nullable
+  private final String dataSource;
+
+  public static SegmentTransactionalAppendAction appendAction(
+      Set<DataSegment> segments,
+      @Nullable DataSourceMetadata startMetadata,
+      @Nullable DataSourceMetadata endMetadata
+  )
+  {
+    return new SegmentTransactionalAppendAction(segments, startMetadata, 
endMetadata, null);
+  }
+
+  @JsonCreator
+  private SegmentTransactionalAppendAction(
+      @JsonProperty("segments") @Nullable Set<DataSegment> segments,
+      @JsonProperty("startMetadata") @Nullable DataSourceMetadata 
startMetadata,
+      @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
+      @JsonProperty("dataSource") @Nullable String dataSource
+  )
+  {
+    this.segments = segments == null ? ImmutableSet.of() : 
ImmutableSet.copyOf(segments);
+    this.startMetadata = startMetadata;
+    this.endMetadata = endMetadata;
+    this.dataSource = dataSource;
+  }
+
+  @JsonProperty
+  public Set<DataSegment> getSegments()
+  {
+    return segments;
+  }
+
+  @JsonProperty
+  @Nullable
+  public DataSourceMetadata getStartMetadata()
+  {
+    return startMetadata;
+  }
+
+  @JsonProperty
+  @Nullable
+  public DataSourceMetadata getEndMetadata()
+  {
+    return endMetadata;
+  }
+
+  @JsonProperty
+  @Nullable
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @Override
+  public TypeReference<SegmentPublishResult> getReturnTypeReference()
+  {
+    return new TypeReference<SegmentPublishResult>()
+    {
+    };
+  }
+
+  /**
+   * Performs some sanity checks and publishes the given segments.
+   */
+  @Override
+  public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
+  {
+    final SegmentPublishResult retVal;
+
+    if (segments.isEmpty()) {
+      // A stream ingestion task didn't ingest any rows and created no 
segments (e.g., all records were unparseable),
+      // but still needs to update metadata with the progress that the task 
made.
+      try {
+        retVal = 
toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly(
+            dataSource,
+            startMetadata,
+            endMetadata
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      return retVal;
+    }
+
+    final Set<DataSegment> allSegments = new HashSet<>(segments);
+
+    String datasource = task.getDataSource();
+    Map<Interval, TaskLock> replaceLocks = new HashMap<>();
+    for (TaskLock lock : TaskLocks.findReplaceLocksForSegments(datasource, 
toolbox.getTaskLockbox(), segments)) {
+      replaceLocks.put(lock.getInterval(), lock);
+    }
+    Map<DataSegment, TaskLockInfo> appendSegmentLockMap = new HashMap<>();
+    Set<TaskLockInfo> taskLockInfos = new HashSet<>();
+    for (TaskLock taskLock : replaceLocks.values()) {
+      taskLockInfos.add(getTaskLockInfo(taskLock));
+    }
+
+    for (DataSegment segment : segments) {
+      Interval interval = segment.getInterval();
+      for (Interval key : replaceLocks.keySet()) {
+        if (key.contains(interval)) {
+          appendSegmentLockMap.put(segment, 
getTaskLockInfo(replaceLocks.get(key)));
+        }
+      }
+    }
+
+    try {
+      retVal = toolbox.getTaskLockbox().doInCriticalSection(
+          task,
+          
allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+          CriticalAction.<SegmentPublishResult>builder()
+              .onValidLocks(
+                  () -> 
toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
+                      segments,
+                      null,
+                      startMetadata,
+                      endMetadata,
+                      appendSegmentLockMap,
+                      taskLockInfos,
+                      true
+                  )
+              )
+              .onInvalidLocks(
+                  () -> SegmentPublishResult.fail(
+                      "Invalid task locks. Maybe they are revoked by a higher 
priority task."
+                      + " Please check the overlord log for details."
+                  )
+              )
+              .build()
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    // Emit metrics
+    final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+    IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+
+    if (retVal.isSuccess()) {
+      toolbox.getEmitter().emit(metricBuilder.build("segment/txn/success", 1));
+    } else {
+      toolbox.getEmitter().emit(metricBuilder.build("segment/txn/failure", 1));
+    }
+
+    // getSegments() should return an empty set if 
announceHistoricalSegments() failed
+    for (DataSegment segment : retVal.getSegments()) {
+      metricBuilder.setDimension(DruidMetrics.INTERVAL, 
segment.getInterval().toString());
+      metricBuilder.setDimension(
+          DruidMetrics.PARTITIONING_TYPE,
+          segment.getShardSpec() == null ? null : 
segment.getShardSpec().getType()
+      );
+      toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", 
segment.getSize()));
+    }
+
+    return retVal;
+  }
+
+
+  private TaskLockInfo getTaskLockInfo(TaskLock taskLock)
+  {
+    return new TaskLockInfo(taskLock.getInterval(), taskLock.getVersion());
+  }

Review Comment:
   TaskLock is not available in that package, which is why the class 
TaskLockInfo needs to be created there and this method cannot be implemented



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java:
##########
@@ -430,7 +429,8 @@ protected boolean tryTimeChunkLock(TaskActionClient client, 
List<Interval> inter
       }
 
       prev = cur;
-      final TaskLock lock = client.submit(new 
TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
+      final TaskLockType taskLockType = 
TaskLockType.valueOf(getContextValue(Tasks.TASK_LOCK_TYPE, 
TaskLockType.EXCLUSIVE.name()));

Review Comment:
   done



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java:
##########
@@ -724,9 +725,12 @@ public static NonnullPair<Interval, String> 
findIntervalAndVersion(
           }
         }
         // We don't have a lock for this interval, so we should lock it now.
+        if (taskLockType == null) {
+          taskLockType = TaskLockType.EXCLUSIVE;
+        }

Review Comment:
   Fixed



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -911,9 +914,23 @@ private TaskStatus generateAndPublishSegments(
     }
 
 
-    final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, 
segmentsToDrop, segmentsToPublish, commitMetadata) ->
-        toolbox.getTaskActionClient()
-               
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten,
 segmentsToDrop, segmentsToPublish));
+    final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, 
segmentsToDrop, segmentsToPublish, commitMetadata) -> {
+      TaskLockType lockType = 
TaskLockType.valueOf(getContextValue(Tasks.TASK_LOCK_TYPE, 
TaskLockType.EXCLUSIVE.name()));
+      switch (lockType) {
+        case REPLACE:
+          return toolbox.getTaskActionClient().submit(
+              
SegmentTransactionalReplaceAction.overwriteAction(segmentsToBeOverwritten, 
segmentsToDrop, segmentsToPublish)

Review Comment:
   Changed



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.CriticalAction;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
+import org.apache.druid.indexing.overlord.TaskLockInfo;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Replace segments in metadata storage. The segment versions must all be less 
than or equal to a lock held by
+ * your task for the segment intervals.
+ */
+public class SegmentTransactionalReplaceAction implements 
TaskAction<SegmentPublishResult>
+{
+  /**
+   * Set of segments that was fully overshadowed by new segments, {@link 
SegmentTransactionalReplaceAction#segments}
+   */
+  @Nullable
+  private final Set<DataSegment> segmentsToBeOverwritten;
+  /**
+   * Set of segments to be inserted into metadata storage
+   */
+  private final Set<DataSegment> segments;
+  /**
+   * Set of segments to be dropped (mark unused) when new segments, {@link 
SegmentTransactionalReplaceAction#segments},
+   * are inserted into metadata storage.
+   */
+  @Nullable
+  private final Set<DataSegment> segmentsToBeDropped;
+
+  @Nullable
+  private final DataSourceMetadata startMetadata;
+  @Nullable
+  private final DataSourceMetadata endMetadata;
+  @Nullable
+  private final String dataSource;
+
+  public static SegmentTransactionalReplaceAction overwriteAction(
+      @Nullable Set<DataSegment> segmentsToBeOverwritten,
+      @Nullable Set<DataSegment> segmentsToBeDropped,
+      Set<DataSegment> segmentsToPublish
+  )
+  {
+    return new SegmentTransactionalReplaceAction(segmentsToBeOverwritten, 
segmentsToBeDropped, segmentsToPublish, null, null, null);
+  }
+
+  @JsonCreator
+  private SegmentTransactionalReplaceAction(
+      @JsonProperty("segmentsToBeOverwritten") @Nullable Set<DataSegment> 
segmentsToBeOverwritten,
+      @JsonProperty("segmentsToBeDropped") @Nullable Set<DataSegment> 
segmentsToBeDropped,
+      @JsonProperty("segments") @Nullable Set<DataSegment> segments,
+      @JsonProperty("startMetadata") @Nullable DataSourceMetadata 
startMetadata,
+      @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
+      @JsonProperty("dataSource") @Nullable String dataSource
+  )
+  {
+    this.segmentsToBeOverwritten = segmentsToBeOverwritten;
+    this.segmentsToBeDropped = segmentsToBeDropped;
+    this.segments = segments == null ? ImmutableSet.of() : 
ImmutableSet.copyOf(segments);
+    this.startMetadata = startMetadata;
+    this.endMetadata = endMetadata;
+    this.dataSource = dataSource;
+  }
+
+  @JsonProperty
+  @Nullable
+  public Set<DataSegment> getSegmentsToBeOverwritten()
+  {
+    return segmentsToBeOverwritten;
+  }
+
+  @JsonProperty
+  @Nullable
+  public Set<DataSegment> getSegmentsToBeDropped()
+  {
+    return segmentsToBeDropped;
+  }
+
+  @JsonProperty
+  public Set<DataSegment> getSegments()
+  {
+    return segments;
+  }
+
+  @JsonProperty
+  @Nullable
+  public DataSourceMetadata getStartMetadata()
+  {
+    return startMetadata;
+  }
+
+  @JsonProperty
+  @Nullable
+  public DataSourceMetadata getEndMetadata()
+  {
+    return endMetadata;
+  }
+
+  @JsonProperty
+  @Nullable
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @Override
+  public TypeReference<SegmentPublishResult> getReturnTypeReference()
+  {
+    return new TypeReference<SegmentPublishResult>()
+    {
+    };
+  }
+
+  /**
+   * Performs some sanity checks and publishes the given segments.
+   */
+  @Override
+  public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
+  {
+    final SegmentPublishResult retVal;
+
+    if (segments.isEmpty()) {
+      // A stream ingestion task didn't ingest any rows and created no 
segments (e.g., all records were unparseable),
+      // but still needs to update metadata with the progress that the task 
made.
+      try {
+        retVal = 
toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly(
+            dataSource,
+            startMetadata,
+            endMetadata
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      return retVal;
+    }
+
+    final Set<DataSegment> allSegments = new HashSet<>(segments);
+
+    TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), 
allSegments);
+
+    String datasource = task.getDataSource();
+    Map<Interval, TaskLock> replaceLocks = new HashMap<>();
+    for (TaskLock lock : TaskLocks.findReplaceLocksForSegments(datasource, 
toolbox.getTaskLockbox(), segments)) {

Review Comment:
   The replace locks are to find segments that need to be carried forward to 
the version of the replace lock



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TimeChunkLockRequest.java:
##########
@@ -115,6 +115,9 @@ public Interval getInterval()
   @Override
   public String getVersion()
   {
+    if (lockType.equals(TaskLockType.APPEND) && preferredVersion == null) {
+      return "1970-01-01T00:00:00.000Z";
+    }
     return preferredVersion == null ? DateTimes.nowUtc().toString() : 
preferredVersion;

Review Comment:
   Done



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -259,6 +263,16 @@ SegmentIdWithShardSpec allocatePendingSegment(
    * @throws IllegalArgumentException if startMetadata and endMetadata are not 
either both null or both non-null
    * @throws RuntimeException         if the state of metadata storage after 
this call is unknown
    */
+  SegmentPublishResult announceHistoricalSegments(
+      Set<DataSegment> segments,
+      Set<DataSegment> segmentsToDrop,
+      @Nullable DataSourceMetadata startMetadata,
+      @Nullable DataSourceMetadata endMetadata,
+      @Nullable Map<DataSegment, TaskLockInfo> segmentLockMap,
+      @Nullable Set<TaskLockInfo> taskLockInfos,
+      boolean append
+  ) throws IOException;
+

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to