kfaraz commented on code in PR #18436:
URL: https://github.com/apache/druid/pull/18436#discussion_r2299659909
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1559,6 +1567,22 @@ public Void apply(@Nullable Object input)
);
}
+ private void unlockIntervalIfApplicable(Sink abandonedSink)
+ {
+ Interval abandonedInterval = abandonedSink.getInterval();
+ boolean isIntervalActive = sinks.entrySet().stream()
+ .anyMatch(entry -> {
+ Sink sink = entry.getValue();
+ return !Objects.equals(sink,
abandonedSink)
+ && sink.isWritable()
+ &&
sink.getInterval().equals(abandonedInterval);
Review Comment:
We should check for `overlaps`.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskLockCallback.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.joda.time.Interval;
+
+/**
+ * This interface provides a callback mechanism to interact with TaskLockbox
for releasing interval locks when
+ * the segments are handed off.
+ */
+public interface TaskLockCallback
Review Comment:
Maybe mention in the javadoc that this interface is needed only because
`TaskLockbox` is in `indexing-service` module.
Maybe rename the interface `TaskIntervalUnlocker` since it is only going to
be used for unlocking intervals.
Also, please annotate it `@FunctionalInterface`.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskLockCallback.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.joda.time.Interval;
+
+/**
+ * This interface provides a callback mechanism to interact with TaskLockbox
for releasing interval locks when
+ * the segments are handed off.
+ */
+public interface TaskLockCallback
+{
+ /**
+ * Releases the lock for the given interval.
+ *
+ * @param interval interval for which the lock needs to be released
+ */
+ default void releaseLock(Interval interval)
Review Comment:
This is a new functional interface, we shouldn't need to provide a default
implementation for the method.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLockCallbackImpl.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import org.apache.druid.segment.realtime.appenderator.TaskLockCallback;
+import org.joda.time.Interval;
+
+public class TaskLockCallbackImpl implements TaskLockCallback
Review Comment:
We shouldn't need a separate class for this implementation. We should just
inline this.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java:
##########
@@ -57,11 +57,13 @@ public class SeekableStreamAppenderatorConfig implements
AppenderatorConfig
private final SeekableStreamIndexTaskTuningConfig tuningConfig;
private final int maxColumnsToMerge;
+ private final boolean shouldReleaseLockOnHandoff;
Review Comment:
We probably don't need an extra field or method here, just access it via the
tuning config.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java:
##########
@@ -146,6 +149,9 @@ public SeekableStreamIndexTaskTuningConfig(
this.numPersistThreads = Math.max(numPersistThreads,
AppenderatorConfig.DEFAULT_NUM_PERSIST_THREADS);
}
this.maxColumnsToMerge = maxColumnsToMerge == null ?
DEFAULT_MAX_COLUMNS_TO_MERGE : maxColumnsToMerge;
+ this.shouldReleaseLockOnHandoff = shouldReleaseLockOnHandoff == null
+ ? DEFAULT_SHOULD_RELEASE_LOCK_ON_HANDOFF
+ : shouldReleaseLockOnHandoff;
Review Comment:
short hand:
```suggestion
this.shouldReleaseLockOnHandoff =
Configs.valueOrDefault(shouldReleaseLockOnHandoff,
DEFAULT_SHOULD_RELEASE_LOCK_ON_HANDOFF);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]