kfaraz commented on code in PR #17959: URL: https://github.com/apache/druid/pull/17959#discussion_r2078941779
########## extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyCoordinatorClient.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.cluster.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.Inject; +import com.google.inject.Provider; +import org.apache.druid.client.coordinator.Coordinator; +import org.apache.druid.client.coordinator.CoordinatorClientImpl; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocator; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.testing.cluster.ClusterTestingTaskConfig; +import org.joda.time.Duration; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * Implementation of {@code CoordinatorClient} that supports the following: + * <ul> + * <li>Add a {@code minSegmentHandoffDelay} before making a call to Coordinator + * to get the handoff status of a single segment.</li> + * </ul> + */ +public class FaultyCoordinatorClient extends CoordinatorClientImpl +{ + private static final Logger log = new Logger(FaultyCoordinatorClient.class); + + private final Provider<ClusterTestingTaskConfig> testConfigProvider; + private final ConcurrentHashMap<SegmentDescriptor, Stopwatch> segmentHandoffTimers = new ConcurrentHashMap<>(); + + @Inject + public FaultyCoordinatorClient( + Provider<ClusterTestingTaskConfig> testingConfigProvider, + @Json final ObjectMapper jsonMapper, + @EscalatedGlobal final ServiceClientFactory clientFactory, + @Coordinator final ServiceLocator serviceLocator + ) + { + super( + clientFactory.makeClient( + NodeRole.COORDINATOR.getJsonName(), + serviceLocator, + StandardRetryPolicy.builder().maxAttempts(6).build() + ), + jsonMapper + ); + this.testConfigProvider = testingConfigProvider; + } + + @Override + public ListenableFuture<Boolean> isHandoffComplete(String dataSource, SegmentDescriptor descriptor) + { + final Duration minHandoffDelay = getHandoffDelay(); + if (minHandoffDelay != null) { + final Stopwatch sinceHandoffCheckStarted = segmentHandoffTimers.computeIfAbsent( + descriptor, + d -> Stopwatch.createStarted() + ); + + if (sinceHandoffCheckStarted.isRunning() + && sinceHandoffCheckStarted.hasElapsed(minHandoffDelay)) { + // Wait period is over, check with Coordinator now + log.info( + "Min handoff delay[%s] has elapsed for segment[%s]. Checking with Coordinator for actual handoff status.", + minHandoffDelay, descriptor + ); + + // Stop the Stopwatch but do not remove it from the map. This ensures + // that we do not create a new Stopwatch causing further delays. + sinceHandoffCheckStarted.stop(); + } else { + // Until the min handoff delay has elapsed, keep returning false + return Futures.immediateFuture(false); + } + } + + // Call Coordinator for the actual handoff status + return super.isHandoffComplete(dataSource, descriptor); Review Comment: ah, good catch! Let me fix it up. ########## extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyMetadataStorageCoordinator.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.cluster.overlord; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory; +import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; +import org.apache.druid.segment.metadata.SegmentSchemaManager; + +/** + * Implementation of {@code IndexerMetadataStorageCoordinator} that supports the + * following: + * <ul> + * <li></li> + * </ul> + */ +public class FaultyMetadataStorageCoordinator extends IndexerSQLMetadataStorageCoordinator +{ + private static final Logger log = new Logger(FaultyMetadataStorageCoordinator.class); + + @Inject + public FaultyMetadataStorageCoordinator( + SegmentMetadataTransactionFactory transactionFactory, + ObjectMapper jsonMapper, + MetadataStorageTablesConfig dbTables, + SQLMetadataConnector connector, + SegmentSchemaManager segmentSchemaManager, + CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig + ) + { + super( + transactionFactory, + jsonMapper, + dbTables, + connector, + segmentSchemaManager, + centralizedDatasourceSchemaConfig + ); + log.info("Creating a faulty metadata storage coordinator"); + } + + @Override + @LifecycleStart + public void start() + { + log.info("Starting the faulty metadata storage coordinator"); + super.start(); + } + + @Override + public int deletePendingSegments(String dataSource) + { + return super.deletePendingSegments(dataSource); + } + + @Override + public int deletePendingSegmentsForTaskAllocatorId(String datasource, String taskAllocatorId) + { + log.info("Deleting pending segments with a bit of faulty behaviour"); Review Comment: Removed this class as it is currently not being used to inject any faults. ########## extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Provider; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.testing.cluster.ClusterTestingTaskConfig; +import org.apache.druid.testing.cluster.overlord.FaultyLagAggregator; +import org.apache.druid.testing.cluster.overlord.FaultyMetadataStorageCoordinator; +import org.apache.druid.testing.cluster.overlord.FaultyTaskLockbox; +import org.apache.druid.testing.cluster.task.FaultyCoordinatorClient; +import org.apache.druid.testing.cluster.task.FaultyOverlordClient; +import org.apache.druid.testing.cluster.task.FaultyRemoteTaskActionClientFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +/** + * Module that injects faulty clients into the Peon process to simulate various + * fault scenarios. + */ +public class ClusterTestingModule implements DruidModule +{ + private static final Logger log = new Logger(ClusterTestingModule.class); + + private Set<NodeRole> roles; + private boolean isClusterTestingEnabled = false; + + @Inject + public void configure( + Properties props, + @Self Set<NodeRole> roles + ) + { + this.isClusterTestingEnabled = Boolean.parseBoolean( + props.getProperty("druid.unsafe.cluster.testing", "false") + ); + this.roles = roles; + } + + @Override + public void configure(Binder binder) + { + if (isClusterTestingEnabled) { + bindDependenciesForClusterTestingMode(binder); + } + } + + private void bindDependenciesForClusterTestingMode(Binder binder) + { + if (roles.equals(Set.of(NodeRole.PEON))) { + // Bind cluster testing config + binder.bind(ClusterTestingTaskConfig.class) + .toProvider(TestConfigProvider.class) + .in(LazySingleton.class); + + // Bind faulty clients for Coordinator, Overlord and task actions + binder.bind(CoordinatorClient.class) + .to(FaultyCoordinatorClient.class) + .in(LazySingleton.class); + binder.bind(OverlordClient.class) + .to(FaultyOverlordClient.class) + .in(LazySingleton.class); + binder.bind(RemoteTaskActionClientFactory.class) + .to(FaultyRemoteTaskActionClientFactory.class) + .in(LazySingleton.class); + } else if (roles.contains(NodeRole.OVERLORD)) { + // If this is the Overlord, bind a faulty storage coordinator + log.info("Running Overlord in cluster testing mode."); + binder.bind(IndexerSQLMetadataStorageCoordinator.class) + .to(FaultyMetadataStorageCoordinator.class) + .in(ManageLifecycle.class); + binder.bind(TaskLockbox.class) + .to(FaultyTaskLockbox.class) + .in(LazySingleton.class); + } + } + + @Override + public List<? extends Module> getJacksonModules() + { + return List.of( + new SimpleModule(getClass().getSimpleName()) + .registerSubtypes(new NamedType(FaultyLagAggregator.class, "faulty")) + ); + } + + private static class TestConfigProvider implements Provider<ClusterTestingTaskConfig> + { + private final Task task; + private final ObjectMapper mapper; + + @Inject + public TestConfigProvider(Task task, ObjectMapper mapper) + { + this.task = task; + this.mapper = mapper; + } + + @Override + public ClusterTestingTaskConfig get() + { + try { + final ClusterTestingTaskConfig testingConfig = ClusterTestingTaskConfig.forTask(task, mapper); + log.info("Running task in cluster testing mode with config[%s].", testingConfig); Review Comment: updated. ########## extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Provider; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.testing.cluster.ClusterTestingTaskConfig; +import org.apache.druid.testing.cluster.overlord.FaultyLagAggregator; +import org.apache.druid.testing.cluster.overlord.FaultyMetadataStorageCoordinator; +import org.apache.druid.testing.cluster.overlord.FaultyTaskLockbox; +import org.apache.druid.testing.cluster.task.FaultyCoordinatorClient; +import org.apache.druid.testing.cluster.task.FaultyOverlordClient; +import org.apache.druid.testing.cluster.task.FaultyRemoteTaskActionClientFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +/** + * Module that injects faulty clients into the Peon process to simulate various + * fault scenarios. + */ +public class ClusterTestingModule implements DruidModule +{ + private static final Logger log = new Logger(ClusterTestingModule.class); + + private Set<NodeRole> roles; + private boolean isClusterTestingEnabled = false; + + @Inject + public void configure( + Properties props, + @Self Set<NodeRole> roles + ) + { + this.isClusterTestingEnabled = Boolean.parseBoolean( + props.getProperty("druid.unsafe.cluster.testing", "false") + ); + this.roles = roles; + } + + @Override + public void configure(Binder binder) + { + if (isClusterTestingEnabled) { + bindDependenciesForClusterTestingMode(binder); + } + } + + private void bindDependenciesForClusterTestingMode(Binder binder) + { + if (roles.equals(Set.of(NodeRole.PEON))) { + // Bind cluster testing config + binder.bind(ClusterTestingTaskConfig.class) + .toProvider(TestConfigProvider.class) + .in(LazySingleton.class); + + // Bind faulty clients for Coordinator, Overlord and task actions + binder.bind(CoordinatorClient.class) + .to(FaultyCoordinatorClient.class) + .in(LazySingleton.class); + binder.bind(OverlordClient.class) + .to(FaultyOverlordClient.class) + .in(LazySingleton.class); + binder.bind(RemoteTaskActionClientFactory.class) + .to(FaultyRemoteTaskActionClientFactory.class) + .in(LazySingleton.class); + } else if (roles.contains(NodeRole.OVERLORD)) { + // If this is the Overlord, bind a faulty storage coordinator + log.info("Running Overlord in cluster testing mode."); Review Comment: I like warn better for this too. ########## extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Provider; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.testing.cluster.ClusterTestingTaskConfig; +import org.apache.druid.testing.cluster.overlord.FaultyLagAggregator; +import org.apache.druid.testing.cluster.overlord.FaultyMetadataStorageCoordinator; +import org.apache.druid.testing.cluster.overlord.FaultyTaskLockbox; +import org.apache.druid.testing.cluster.task.FaultyCoordinatorClient; +import org.apache.druid.testing.cluster.task.FaultyOverlordClient; +import org.apache.druid.testing.cluster.task.FaultyRemoteTaskActionClientFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +/** + * Module that injects faulty clients into the Peon process to simulate various + * fault scenarios. + */ +public class ClusterTestingModule implements DruidModule +{ + private static final Logger log = new Logger(ClusterTestingModule.class); + + private Set<NodeRole> roles; + private boolean isClusterTestingEnabled = false; + + @Inject + public void configure( + Properties props, + @Self Set<NodeRole> roles + ) + { + this.isClusterTestingEnabled = Boolean.parseBoolean( + props.getProperty("druid.unsafe.cluster.testing", "false") + ); + this.roles = roles; + } + + @Override + public void configure(Binder binder) + { + if (isClusterTestingEnabled) { Review Comment: Updated. Thanks for the suggestion. ########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java: ########## @@ -1259,30 +1259,45 @@ private void cleanupUpgradeAndPendingSegments(Task task) ); } - // Clean up pending segments associated with an APPEND task - if (task instanceof PendingSegmentAllocatingTask) { - final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId(); - if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) { - final Set<String> taskIdsForSameAllocator = activeAllocatorIdToTaskIds.get(taskAllocatorId); - taskIdsForSameAllocator.remove(task.getId()); - - if (taskIdsForSameAllocator.isEmpty()) { - final int pendingSegmentsDeleted = metadataStorageCoordinator - .deletePendingSegmentsForTaskAllocatorId(task.getDataSource(), taskAllocatorId); - log.info( - "Deleted [%d] entries from pendingSegments table for taskAllocatorId[%s].", - pendingSegmentsDeleted, taskAllocatorId - ); - } - activeAllocatorIdToTaskIds.remove(taskAllocatorId); - } - } + cleanupPendingSegments(task); } catch (Exception e) { log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments tables."); } } + /** + * Cleans up pending segments associated with an APPEND task. + */ + protected void cleanupPendingSegments(Task task) + { + if (!(task instanceof PendingSegmentAllocatingTask)) { + return; + } + + giant.lock(); Review Comment: Yes, the `activeAllocatorToTaskIds` field must be accessed within the `giant` lock. Since this method is now `protected` and can be accessed by sub-classes too, the lock acquisition has to be done in this method itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
