kfaraz commented on code in PR #15085:
URL: https://github.com/apache/druid/pull/15085#discussion_r1347227322
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -121,9 +122,20 @@ default Collection<DataSegment>
retrieveUsedSegmentsForInterval(
Collection<DataSegment> retrieveUsedSegmentsForIntervals(
String dataSource,
List<Interval> intervals,
- Segments visibility
+ Segments visibility,
+ String createdBefore
);
+ default Collection<DataSegment> retrieveUsedSegmentsForIntervals(
Review Comment:
Don't add this default method.
Instead, add a new method that caters to the new action. The new method
should accept a single interval instead of a list.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This TaskAction returns a collection of segments which have data within the
specified interval and are marked as
+ * used, and have been created before a REPLACE lock, if any, was acquired.
+ *
+ * The order of segments within the returned collection is unspecified, but
each segment is guaranteed to appear in
+ * the collection only once.
+ *
+ * @implNote This action doesn't produce a {@link Set} because it's
implemented via {@link
+ *
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals}
which returns
+ * a collection. Producing a {@link Set} would require an unnecessary copy of
segments collection.
+ */
+public class RetrieveLockedSegmentsAction implements
TaskAction<Collection<DataSegment>>
+{
+ @JsonIgnore
+ private final String dataSource;
+
+ @JsonIgnore
+ private final Interval interval;
+
+ @JsonIgnore
+ private final Segments visibility;
+
+ @JsonCreator
+ public RetrieveLockedSegmentsAction(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval,
+ // When JSON object is deserialized, this parameter is optional for
backward compatibility.
Review Comment:
This is a new action. We need not account for backward compatibility here.
We should either make the `visibility` field not nullable or just remove this
field altogether (as it will only be used in one place in `DruidInputSource`)
and always pass `ONLY_VISIBLE` to the underlying `IndexerSQLCoordinator`
methods.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This TaskAction returns a collection of segments which have data within the
specified interval and are marked as
+ * used, and have been created before a REPLACE lock, if any, was acquired.
+ *
+ * The order of segments within the returned collection is unspecified, but
each segment is guaranteed to appear in
+ * the collection only once.
+ *
+ * @implNote This action doesn't produce a {@link Set} because it's
implemented via {@link
+ *
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals}
which returns
+ * a collection. Producing a {@link Set} would require an unnecessary copy of
segments collection.
+ */
+public class RetrieveLockedSegmentsAction implements
TaskAction<Collection<DataSegment>>
+{
+ @JsonIgnore
+ private final String dataSource;
+
+ @JsonIgnore
+ private final Interval interval;
+
+ @JsonIgnore
+ private final Segments visibility;
+
+ @JsonCreator
+ public RetrieveLockedSegmentsAction(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval,
+ // When JSON object is deserialized, this parameter is optional for
backward compatibility.
+ // Otherwise, it shouldn't be considered optional.
+ @JsonProperty("visibility") @Nullable Segments visibility
+ )
+ {
+ this.dataSource = dataSource;
+ this.interval = interval;
+ // Defaulting to the former behaviour when visibility wasn't explicitly
specified for backward compatibility
+ this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE;
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @JsonProperty
+ public Segments getVisibility()
+ {
+ return visibility;
+ }
+
+ @Override
+ public TypeReference<Collection<DataSegment>> getReturnTypeReference()
+ {
+ return new TypeReference<Collection<DataSegment>>() {};
+ }
+
+ @Override
+ public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+ {
+ final Set<ReplaceTaskLock> replaceLocksForTask =
toolbox.getTaskLockbox().findReplaceLocksForTask(task);
+ String createdBefore = null;
+ if (task.getDataSource().equals(dataSource)) {
Review Comment:
This should be a precondition check at the start of this method.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This TaskAction returns a collection of segments which have data within the
specified interval and are marked as
+ * used, and have been created before a REPLACE lock, if any, was acquired.
+ *
+ * The order of segments within the returned collection is unspecified, but
each segment is guaranteed to appear in
+ * the collection only once.
+ *
+ * @implNote This action doesn't produce a {@link Set} because it's
implemented via {@link
+ *
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals}
which returns
+ * a collection. Producing a {@link Set} would require an unnecessary copy of
segments collection.
Review Comment:
Not really needed.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This TaskAction returns a collection of segments which have data within the
specified interval and are marked as
+ * used, and have been created before a REPLACE lock, if any, was acquired.
+ *
+ * The order of segments within the returned collection is unspecified, but
each segment is guaranteed to appear in
+ * the collection only once.
+ *
+ * @implNote This action doesn't produce a {@link Set} because it's
implemented via {@link
+ *
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals}
which returns
+ * a collection. Producing a {@link Set} would require an unnecessary copy of
segments collection.
+ */
+public class RetrieveLockedSegmentsAction implements
TaskAction<Collection<DataSegment>>
+{
+ @JsonIgnore
+ private final String dataSource;
+
+ @JsonIgnore
+ private final Interval interval;
+
+ @JsonIgnore
+ private final Segments visibility;
+
+ @JsonCreator
+ public RetrieveLockedSegmentsAction(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval,
+ // When JSON object is deserialized, this parameter is optional for
backward compatibility.
+ // Otherwise, it shouldn't be considered optional.
+ @JsonProperty("visibility") @Nullable Segments visibility
+ )
+ {
+ this.dataSource = dataSource;
+ this.interval = interval;
+ // Defaulting to the former behaviour when visibility wasn't explicitly
specified for backward compatibility
Review Comment:
This is a new action, we don't need backward compatibility here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This TaskAction returns a collection of segments which have data within the
specified interval and are marked as
+ * used, and have been created before a REPLACE lock, if any, was acquired.
+ *
+ * The order of segments within the returned collection is unspecified, but
each segment is guaranteed to appear in
+ * the collection only once.
+ *
+ * @implNote This action doesn't produce a {@link Set} because it's
implemented via {@link
+ *
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals}
which returns
+ * a collection. Producing a {@link Set} would require an unnecessary copy of
segments collection.
+ */
+public class RetrieveLockedSegmentsAction implements
TaskAction<Collection<DataSegment>>
+{
+ @JsonIgnore
+ private final String dataSource;
+
+ @JsonIgnore
+ private final Interval interval;
+
+ @JsonIgnore
+ private final Segments visibility;
+
+ @JsonCreator
+ public RetrieveLockedSegmentsAction(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval,
+ // When JSON object is deserialized, this parameter is optional for
backward compatibility.
+ // Otherwise, it shouldn't be considered optional.
+ @JsonProperty("visibility") @Nullable Segments visibility
+ )
+ {
+ this.dataSource = dataSource;
+ this.interval = interval;
+ // Defaulting to the former behaviour when visibility wasn't explicitly
specified for backward compatibility
+ this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE;
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @JsonProperty
+ public Segments getVisibility()
+ {
+ return visibility;
+ }
+
+ @Override
+ public TypeReference<Collection<DataSegment>> getReturnTypeReference()
+ {
+ return new TypeReference<Collection<DataSegment>>() {};
+ }
+
+ @Override
+ public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+ {
+ final Set<ReplaceTaskLock> replaceLocksForTask =
toolbox.getTaskLockbox().findReplaceLocksForTask(task);
+ String createdBefore = null;
Review Comment:
```suggestion
String lockVersion = null;
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This TaskAction returns a collection of segments which have data within the
specified interval and are marked as
+ * used, and have been created before a REPLACE lock, if any, was acquired.
+ *
+ * The order of segments within the returned collection is unspecified, but
each segment is guaranteed to appear in
+ * the collection only once.
+ *
+ * @implNote This action doesn't produce a {@link Set} because it's
implemented via {@link
+ *
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals}
which returns
+ * a collection. Producing a {@link Set} would require an unnecessary copy of
segments collection.
+ */
+public class RetrieveLockedSegmentsAction implements
TaskAction<Collection<DataSegment>>
Review Comment:
You could call this `RetrieveSegmentsToReplaceAction` to avoid confusion.
Technically, even the segments created after the lock was acquired are still
locked but they will not be replaced.
##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java:
##########
@@ -102,12 +102,21 @@ public static SqlSegmentsMetadataQuery forHandle(
*
* Returns a closeable iterator. You should close it when you are done.
*/
+ public CloseableIterator<DataSegment> retrieveUsedSegments(
+ final String dataSource,
+ final Collection<Interval> intervals,
Review Comment:
Since it is a new method, we might as well just pass a single Interval here
rather than a collection.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]