errose28 commented on code in PR #3788: URL: https://github.com/apache/ozone/pull/3788#discussion_r993639556
########## hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.verification.VerificationMode; + +import java.io.IOException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the on-demand container scanner. + */ +@RunWith(MockitoJUnitRunner.class) +public class TestOnDemandContainerScanner { + + private final AtomicLong containerIdSeq = new AtomicLong(100); + + @Mock + private Container<ContainerData> healthy; + + @Mock + private Container<ContainerData> openContainer; + + @Mock + private Container<ContainerData> corruptData; + + private ContainerScannerConfiguration conf; + private ContainerController controller; + + @Before + public void setup() { + conf = newInstanceOf(ContainerScannerConfiguration.class); + conf.setMetadataScanInterval(0); + conf.setDataScanInterval(0); + controller = mockContainerController(); + } + + @After + public void tearDown() { + OnDemandContainerScanner.shutdown(); + } + + @Test + public void testOnDemandContainerScanner() throws Exception { + //Without initialization, + // there shouldn't be interaction with containerController + OnDemandContainerScanner.scanContainer(corruptData); + Mockito.verifyZeroInteractions(controller); + OnDemandContainerScanner.init(conf, controller); + testContainerMarkedUnhealthy(healthy, never()); + testContainerMarkedUnhealthy(corruptData, atLeastOnce()); + testContainerMarkedUnhealthy(openContainer, never()); + } + + @Test + public void testContainerScannerMultipleInitsAndShutdowns() throws Exception { + OnDemandContainerScanner.init(conf, controller); + OnDemandContainerScanner.init(conf, controller); + OnDemandContainerScanner.shutdown(); + OnDemandContainerScanner.shutdown(); + //There shouldn't be an interaction after shutdown: + testContainerMarkedUnhealthy(corruptData, never()); + } + + private void testContainerMarkedUnhealthy( + Container<?> container, VerificationMode invocationTimes) + throws IOException { + OnDemandContainerScanner.scanContainer(container); + waitForScanToFinish(); + Mockito.verify(controller, invocationTimes).markContainerUnhealthy( + container.getContainerData().getContainerID()); + } + + private void waitForScanToFinish() { Review Comment: Methods like these that are duplicated between this test and TestContainerScannerMetrics can be moved to ContainerTestUtils. ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java: ########## @@ -307,29 +307,27 @@ private void buildContainerSet() { private void startContainerScrub() { ContainerScannerConfiguration c = config.getObject( ContainerScannerConfiguration.class); - boolean enabled = c.isEnabled(); - - if (!enabled) { + if (!c.isEnabled()) { LOG.info("Background container scanner has been disabled."); - } else { - if (this.metadataScanner == null) { - this.metadataScanner = new ContainerMetadataScanner(c, controller); - } - this.metadataScanner.start(); - - if (c.getBandwidthPerVolume() == 0L) { - LOG.warn(VOLUME_BYTES_PER_SECOND_KEY + " is set to 0, " + - "so background container data scanner will not start."); - return; - } + return; + } + OnDemandContainerScanner.init(c, controller); + if (this.metadataScanner == null) { + this.metadataScanner = new ContainerMetadataScanner(c, controller); + } + this.metadataScanner.start(); - dataScanners = new ArrayList<>(); - for (StorageVolume v : volumeSet.getVolumesList()) { - ContainerDataScanner s = new ContainerDataScanner(c, controller, - (HddsVolume) v); - s.start(); - dataScanners.add(s); - } + if (c.getBandwidthPerVolume() == 0L) { Review Comment: We should probably have a similar check and warning for the on demand scanner. Since it is not a background service we can't just not start it like the other scanners. There would probably need to be a check in `OnDemandContainerScanner#scanContainer` to not run if the bandwidth is 0. ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java: ########## @@ -307,29 +307,27 @@ private void buildContainerSet() { private void startContainerScrub() { ContainerScannerConfiguration c = config.getObject( ContainerScannerConfiguration.class); - boolean enabled = c.isEnabled(); - - if (!enabled) { + if (!c.isEnabled()) { LOG.info("Background container scanner has been disabled."); Review Comment: Currently this uses one enable flag for on demand and background scanners. I think this is fine for simplicity. ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Class for performing on demand scans of containers. + */ +public final class OnDemandContainerScanner { + public static final Logger LOG = + LoggerFactory.getLogger(OnDemandContainerScanner.class); + + private static volatile OnDemandContainerScanner instance; + + private final ExecutorService scanExecutor; + private final ContainerController containerController; + private final DataTransferThrottler throttler; + private final Canceler canceler; + private final ConcurrentHashMap + .KeySetView<Container<?>, Boolean> toBeScannedContainers; Review Comment: It looks like this is being used to remove duplicates from the queue. I think we should add a comment explaining this. ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java: ########## @@ -373,11 +368,21 @@ private ContainerCommandResponseProto dispatchRequest( container.getContainerData().getState() == State.UNHEALTHY); sendCloseContainerActionIfNeeded(container); } - + if (cmdType == Type.CreateContainer + && result == Result.SUCCESS && dispatcherContext != null) { + Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap()); + container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0)); + } if (result == Result.SUCCESS) { updateBCSID(container, dispatcherContext, cmdType); audit(action, eventType, params, AuditEventStatus.SUCCESS, null); } else { + //TODO HDDS-7096: + // This is a too general place for on demand scanning. + // Create a specific exception that signals for on demand scanning + // and move this general scan to where it is more appropriate. + // Add integration tests to test the full functionality. + OnDemandContainerScanner.scanContainer(container); Review Comment: I agree this is too general but it is ok for now. After reviewing the code I am again leaning towards an AOP implementation in HDDS-7096 like we discussed offline. - Methods whose failure should trigger scanning would: 1. Throw a specific type of exception when a scan is required 2. Be annotated to mark the behavior, providing a link for casual readers between the method and the aspect. - An aspect would be created to queue containers for scanning using the static on demand scanning utility in this PR. - The aspect's point cut would be methods marked with the annotation that throw the special exception. - The aspect's advice would be to on demand scan the container. The container could be collected from the method arguments or the target of the method invocation. This approach could also be used to improve the volume scanner, which currently has `onFailure` calls in try/catches spread throughout the code. We will end up in the same situation with the on demand scanner otherwise. Also I think we should have every on demand scan do an on demand volume check since those are cheap and the volume scanner already drops frequent repeat scans for us. Anyways these are future ideas and not actionable in this PR, I just wanted to put them out there. ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Class for performing on demand scans of containers. + */ +public final class OnDemandContainerScanner { + public static final Logger LOG = + LoggerFactory.getLogger(OnDemandContainerScanner.class); + + private static volatile OnDemandContainerScanner instance; + + private final ExecutorService scanExecutor; + private final ContainerController containerController; + private final DataTransferThrottler throttler; + private final Canceler canceler; + private final ConcurrentHashMap + .KeySetView<Container<?>, Boolean> toBeScannedContainers; + private final OnDemandScannerMetrics metrics; + @VisibleForTesting + private static Future<?> lastScanFuture; + + private OnDemandContainerScanner( + ContainerScannerConfiguration conf, ContainerController controller) { + containerController = controller; + throttler = new DataTransferThrottler( + conf.getOnDemandBandwidthPerVolume()); + canceler = new Canceler(); + metrics = OnDemandScannerMetrics.create(); + scanExecutor = Executors.newSingleThreadExecutor(); + toBeScannedContainers = ConcurrentHashMap.newKeySet(); + } + + public static synchronized void init( + ContainerScannerConfiguration conf, ContainerController controller) { + if (instance != null) { + LOG.warn("Trying to initialize on demand scanner" + + " a second time on a datanode."); + return; + } + instance = new OnDemandContainerScanner(conf, controller); + } + + public static void scanContainer(Container<?> container) { + if (instance == null) { + return; + } + if (container.shouldScanData() && + instance.toBeScannedContainers.add(container)) { + lastScanFuture = instance.scanExecutor.submit(() -> { + instance.toBeScannedContainers.remove(container); + if (container.shouldScanData()) { + performOnDemandScan(container); + } + }); + } + } + + private static void performOnDemandScan(Container<?> container) { + long containerId = container.getContainerData().getContainerID(); + try { + ContainerData containerData = container.getContainerData(); + logScanStart(containerData); + if (container.scanData(instance.throttler, instance.canceler)) { + Instant now = Instant.now(); + logScanCompleted(containerData, now); + instance.containerController.updateDataScanTimestamp(containerId, now); + } else { + instance.containerController.markContainerUnhealthy(containerId); + instance.metrics.incNumUnHealthyContainers(); + } + instance.metrics.incNumContainersScanned(); + } catch (IOException e) { + LOG.warn("Unexpected exception while scanning container " + + containerId, e); + } + } + + private static void logScanStart(ContainerData containerData) { + if (LOG.isDebugEnabled()) { + Optional<Instant> scanTimestamp = containerData.lastDataScanTime(); + Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never"); + LOG.debug("Scanning container {}, last scanned {}", + containerData.getContainerID(), lastScanTime); + } + } + + private static void logScanCompleted( + ContainerData containerData, Instant timestamp) { + if (LOG.isDebugEnabled()) { + LOG.debug("Completed scan of container {} at {}", + containerData.getContainerID(), timestamp); + } + } + + public static OnDemandScannerMetrics getMetrics() { + return instance.metrics; + } + + public static synchronized void shutdown() { + if (instance == null) { + return; + } + instance.shutdownScanner(); + } + + private synchronized void shutdownScanner() { + instance = null; + metrics.unregister(); + if (!scanExecutor.isShutdown()) { + scanExecutor.shutdown(); + } + try { + long timeoutSeconds = 5; + if (!scanExecutor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) { + LOG.warn("On demand scanner shut down forcefully after {} seconds", + timeoutSeconds); + scanExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.warn("On demand scanner interrupted while waiting for shut down."); + scanExecutor.shutdownNow(); + throw new RuntimeException(e); + } + } + + @VisibleForTesting + public static Future<?> getLastScanFuture() { + return lastScanFuture; + } Review Comment: How about just having scanContainer return the future instead? It's void right now so the extra information can be used only when needed (like tests). ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java: ########## @@ -307,29 +307,27 @@ private void buildContainerSet() { private void startContainerScrub() { ContainerScannerConfiguration c = config.getObject( ContainerScannerConfiguration.class); - boolean enabled = c.isEnabled(); - - if (!enabled) { + if (!c.isEnabled()) { LOG.info("Background container scanner has been disabled."); Review Comment: ```suggestion LOG.info("Container scanners have been disabled."); ``` ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java: ########## @@ -45,12 +45,16 @@ public class ContainerScannerConfiguration { "hdds.container.scrub.data.scan.interval"; public static final String VOLUME_BYTES_PER_SECOND_KEY = "hdds.container.scrub.volume.bytes.per.second"; + public static final String ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY = + "hdds.container.scrub.on.demand.volume.bytes.per.second"; public static final long METADATA_SCAN_INTERVAL_DEFAULT = Duration.ofHours(3).toMillis(); public static final long DATA_SCAN_INTERVAL_DEFAULT = Duration.ofDays(7).toMillis(); public static final long BANDWIDTH_PER_VOLUME_DEFAULT = 1048576; // 1MB + // 1MB + public static final long ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT = 1048576; Review Comment: We had discussed raising the default to 5mb for the background scanner. Should we do the same for the on demand scanner? ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java: ########## @@ -561,11 +566,18 @@ private boolean isContainerFull(Container container) { private boolean isContainerUnhealthy(Container container) { return Optional.ofNullable(container).map( - cont -> (cont.getContainerState() == - ContainerDataProto.State.UNHEALTHY)) + cont -> (cont.getContainerState() == + ContainerDataProto.State.UNHEALTHY)) .orElse(Boolean.FALSE); } + private void scanContainerIfNeeded(Container<?> container) { + if (container.getContainerState() == State.CLOSED || + container.getContainerState() == State.QUASI_CLOSED) { Review Comment: OnDemandScanner#scanContainer already has a similar check, can we remove this method? ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandScannerMetrics.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + +/** + * This class captures the on-demand container data scanner metrics. + **/ [email protected] +@Metrics(about = "On-demand container data scanner metrics", context = "dfs") +public final class OnDemandScannerMetrics + extends AbstractContainerScannerMetrics { Review Comment: Looks like the numScanIterations metric will not be used by the on demand scanner. We could leave it at zero like it is now, or pull it down into the two background scanner classes. ########## hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.verification.VerificationMode; + +import java.io.IOException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the on-demand container scanner. + */ +@RunWith(MockitoJUnitRunner.class) +public class TestOnDemandContainerScanner { Review Comment: In addition to this unit test, it would be good to have an integration test that verifies a client reading or writing to a corrupted container causes it to be queued for scanning, and eventually marked unhealthy and reported to SCM. `TestDataScanner` does this kind of thing for the background metadata scanner, and it could be expanded to test this case as well. ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Class for performing on demand scans of containers. + */ +public final class OnDemandContainerScanner { + public static final Logger LOG = + LoggerFactory.getLogger(OnDemandContainerScanner.class); + + private static volatile OnDemandContainerScanner instance; + + private final ExecutorService scanExecutor; + private final ContainerController containerController; + private final DataTransferThrottler throttler; + private final Canceler canceler; + private final ConcurrentHashMap + .KeySetView<Container<?>, Boolean> toBeScannedContainers; + private final OnDemandScannerMetrics metrics; + @VisibleForTesting Review Comment: nit. This annotation is not needed here since the field is private. ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Class for performing on demand scans of containers. + */ +public final class OnDemandContainerScanner { + public static final Logger LOG = + LoggerFactory.getLogger(OnDemandContainerScanner.class); + + private static volatile OnDemandContainerScanner instance; + + private final ExecutorService scanExecutor; + private final ContainerController containerController; + private final DataTransferThrottler throttler; + private final Canceler canceler; + private final ConcurrentHashMap + .KeySetView<Container<?>, Boolean> toBeScannedContainers; Review Comment: Also I don't see hashcode or equals for the Container class. Using container ID as the key might be better. ########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Class for performing on demand scans of containers. + */ +public final class OnDemandContainerScanner { + public static final Logger LOG = + LoggerFactory.getLogger(OnDemandContainerScanner.class); + + private static volatile OnDemandContainerScanner instance; + + private final ExecutorService scanExecutor; + private final ContainerController containerController; + private final DataTransferThrottler throttler; + private final Canceler canceler; + private final ConcurrentHashMap + .KeySetView<Container<?>, Boolean> toBeScannedContainers; Review Comment: A test case in TestOnDemandContainerScanner to test that duplicates are removed would be a good addition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
