errose28 commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r1003539953
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java:
##########
@@ -373,11 +368,21 @@ private ContainerCommandResponseProto dispatchRequest(
container.getContainerData().getState() == State.UNHEALTHY);
sendCloseContainerActionIfNeeded(container);
}
-
+ if (cmdType == Type.CreateContainer
+ && result == Result.SUCCESS && dispatcherContext != null) {
+ Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
+ container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
+ }
Review Comment:
What's the reason for moving this code block lower?
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java:
##########
@@ -86,6 +92,15 @@ public class ContainerScannerConfiguration {
+ " by scanner per volume.")
private long bandwidthPerVolume = BANDWIDTH_PER_VOLUME_DEFAULT;
+ @Config(key = "on.demand.volume.bytes.per.second",
+ type = ConfigType.LONG,
+ defaultValue = "5242880",
+ tags = {ConfigTag.STORAGE},
+ description = "Config parameter to throttle I/O bandwidth used"
+ + " by on demand scanner per volume.")
Review Comment:
```suggestion
+ " by the on demand container scanner per volume.")
```
We can update the background scanner description too to clarify it is
separate from the volume scanner.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java:
##########
@@ -121,111 +119,126 @@ public static void shutdown() throws IOException {
}
}
+ //This test performs 2 separate tests because creating
+ // and running a cluster is expensive.
@Test
- public void testOpenContainerIntegrity() throws Exception {
+ public void testScannersMarkContainerUnhealthy() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
- Instant testStartTime = Instant.now();
-
- String value = "sample value";
+ String value = "sample key value";
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
- for (int i = 0; i < 10; i++) {
- String keyName = UUID.randomUUID().toString();
-
- OzoneOutputStream out = bucket.createKey(keyName,
- value.getBytes(UTF_8).length, RATIS,
- ONE, new HashMap<>());
- out.write(value.getBytes(UTF_8));
- out.close();
- OzoneKey key = bucket.getKey(keyName);
- Assert.assertEquals(keyName, key.getName());
- OzoneInputStream is = bucket.readKey(keyName);
- byte[] fileContent = new byte[value.getBytes(UTF_8).length];
- is.read(fileContent);
- Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
- keyName, RATIS,
- ONE));
- Assert.assertEquals(value, new String(fileContent, UTF_8));
- Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
- Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
- }
-
+ String keyNameInClosedContainer = "keyNameInClosedContainer";
+ OzoneOutputStream key = createKey(volumeName, bucketName,
+ keyNameInClosedContainer);
+ // write data more than 1 chunk
+ int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2);
+ byte[] data = ContainerTestHelper
+ .getFixedLengthString(value, sizeLargerThanOneChunk)
+ .getBytes(UTF_8);
+ key.write(data);
+
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ TestHelper.waitForContainerClose(key, cluster);
+ key.flush();
+ key.close();
Review Comment:
Is this the correct order, closing the container before flushing the key?
Seems like it should be the other way around.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java:
##########
@@ -307,29 +307,27 @@ private void buildContainerSet() {
private void startContainerScrub() {
ContainerScannerConfiguration c = config.getObject(
ContainerScannerConfiguration.class);
- boolean enabled = c.isEnabled();
-
- if (!enabled) {
+ if (!c.isEnabled()) {
LOG.info("Background container scanner has been disabled.");
Review Comment:
Can we just update the log message to reflect this?
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.verification.VerificationMode;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+
+/**
+ * Unit tests for the on-demand container scanner.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestOnDemandContainerScanner {
+
+ private final AtomicLong containerIdSeq = new AtomicLong(100);
+
+ @Mock
+ private Container<ContainerData> healthy;
+
+ @Mock
+ private Container<ContainerData> openContainer;
+
+ @Mock
+ private Container<ContainerData> corruptData;
+
+ private ContainerScannerConfiguration conf;
+ private ContainerController controller;
+
+ @Before
+ public void setup() {
+ conf = newInstanceOf(ContainerScannerConfiguration.class);
+ conf.setMetadataScanInterval(0);
+ conf.setDataScanInterval(0);
+ controller = mockContainerController();
+ }
+
+ @After
+ public void tearDown() {
+ OnDemandContainerScanner.shutdown();
+ }
+
+ @Test
+ public void testOnDemandContainerScanner() throws Exception {
+ //Without initialization,
+ // there shouldn't be interaction with containerController
+ OnDemandContainerScanner.scanContainer(corruptData);
+ Mockito.verifyZeroInteractions(controller);
+ OnDemandContainerScanner.init(conf, controller);
+ testContainerMarkedUnhealthy(healthy, never());
+ testContainerMarkedUnhealthy(corruptData, atLeastOnce());
+ testContainerMarkedUnhealthy(openContainer, never());
+ }
+
+ @Test
+ public void testContainerScannerMultipleInitsAndShutdowns() throws Exception
{
+ OnDemandContainerScanner.init(conf, controller);
+ OnDemandContainerScanner.init(conf, controller);
+ OnDemandContainerScanner.shutdown();
+ OnDemandContainerScanner.shutdown();
+ //There shouldn't be an interaction after shutdown:
+ testContainerMarkedUnhealthy(corruptData, never());
+ }
+
+ @Test
+ public void testSameContainerQueuedMultipleTimes() {
Review Comment:
The actual asserts in this test may not run due to execution order. Since
the containers are mocked, can we ensure an ongoing scan by scanning a mocked
container whose `scanData` method bocks with a countdown latch or similar
construct? See `UpgradeTestUtils#newPausingFinalizationExecutor` for an example
of using this approach to halt code execution until the test can run some
checks or setup. Then we can run the queue duplicate checks in this test, and
we can also check that `Container#scanData` is only called on the first mock
queued.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java:
##########
@@ -45,12 +45,18 @@ public class ContainerScannerConfiguration {
"hdds.container.scrub.data.scan.interval";
public static final String VOLUME_BYTES_PER_SECOND_KEY =
"hdds.container.scrub.volume.bytes.per.second";
+ public static final String ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY =
+ "hdds.container.scrub.on.demand.volume.bytes.per.second";
public static final long METADATA_SCAN_INTERVAL_DEFAULT =
Duration.ofHours(3).toMillis();
public static final long DATA_SCAN_INTERVAL_DEFAULT =
Duration.ofDays(7).toMillis();
- public static final long BANDWIDTH_PER_VOLUME_DEFAULT = 1048576; // 1MB
+
+ private static final long BYTES_IN_MEGABYTES = 1048576L;
Review Comment:
We can use `OzoneConsts.MB` instead of defining our own constant here. Might
be useful for other bandwidth configurations in this class as well.
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java:
##########
@@ -157,4 +166,32 @@ public static void createDbInstancesForTestIfNeeded(
null, null);
}
}
+
+ public static void waitForScanToFinish(Future<?> scanResult) {
+ try {
+ scanResult.get();
+ } catch (Exception e) {
+ throw new RuntimeException("Error while waiting" +
+ " for on-demand scan to finish");
+ }
+ }
Review Comment:
This method isn't totally necessary. If you look at other test examples they
usually just put `throws Exception` on each test method since it is assumed
that any exceptions that make it to the top level indicate a test failure.
Ideally the container scanner's logging and exceptions should be descriptive
enough without wrapping them with an extra error message.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java:
##########
@@ -121,111 +119,126 @@ public static void shutdown() throws IOException {
}
}
+ //This test performs 2 separate tests because creating
+ // and running a cluster is expensive.
@Test
- public void testOpenContainerIntegrity() throws Exception {
+ public void testScannersMarkContainerUnhealthy() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
- Instant testStartTime = Instant.now();
-
- String value = "sample value";
+ String value = "sample key value";
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
- for (int i = 0; i < 10; i++) {
- String keyName = UUID.randomUUID().toString();
-
- OzoneOutputStream out = bucket.createKey(keyName,
- value.getBytes(UTF_8).length, RATIS,
- ONE, new HashMap<>());
- out.write(value.getBytes(UTF_8));
- out.close();
- OzoneKey key = bucket.getKey(keyName);
- Assert.assertEquals(keyName, key.getName());
- OzoneInputStream is = bucket.readKey(keyName);
- byte[] fileContent = new byte[value.getBytes(UTF_8).length];
- is.read(fileContent);
- Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
- keyName, RATIS,
- ONE));
- Assert.assertEquals(value, new String(fileContent, UTF_8));
- Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
- Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
- }
-
+ String keyNameInClosedContainer = "keyNameInClosedContainer";
+ OzoneOutputStream key = createKey(volumeName, bucketName,
+ keyNameInClosedContainer);
+ // write data more than 1 chunk
+ int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2);
+ byte[] data = ContainerTestHelper
+ .getFixedLengthString(value, sizeLargerThanOneChunk)
+ .getBytes(UTF_8);
+ key.write(data);
+
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ TestHelper.waitForContainerClose(key, cluster);
+ key.flush();
+ key.close();
+
+ String keyNameInOpenContainer = "keyNameInOpenContainer";
+ OzoneOutputStream key2 = createKey(volumeName, bucketName,
+ keyNameInOpenContainer);
+ key2.write(data);
+ key2.close();
// wait for the container report to propagate to SCM
Thread.sleep(5000);
-
Assert.assertEquals(1, cluster.getHddsDatanodes().size());
HddsDatanodeService dn = cluster.getHddsDatanodes().get(0);
OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
- ContainerSet cs = oc.getContainerSet();
- Container c = cs.getContainerIterator().next();
+ ContainerSet containerSet = oc.getContainerSet();
+ //Given an open and a closed container
+ Assert.assertTrue(containerSet.containerCount() > 1);
+ Container<?> openContainer = getContainerInState(containerSet, OPEN);
+ Container<?> closedContainer = getContainerInState(containerSet, CLOSED);
- Assert.assertTrue(cs.containerCount() > 0);
-
- // delete the chunks directory.
- File chunksDir = new File(c.getContainerData().getContainerPath(),
- "chunks");
- deleteDirectory(chunksDir);
- Assert.assertFalse(chunksDir.exists());
+ //When deleting their metadata to make them unhealthy and scanning them
+ deleteChunksDirForContainer(openContainer);
+ deleteChunksDirForContainer(closedContainer);
ContainerScannerConfiguration conf = ozoneConfig.getObject(
ContainerScannerConfiguration.class);
ContainerMetadataScanner sb = new ContainerMetadataScanner(conf,
oc.getController());
- sb.scanContainer(c);
-
+ //Scan the open container and trigger on-demand scan for the closed one
+ sb.scanContainer(openContainer);
+ tryReadKeyWithMissingChunksDir(bucket, keyNameInClosedContainer);
// wait for the incremental container report to propagate to SCM
Thread.sleep(5000);
ContainerManager cm = cluster.getStorageContainerManager()
.getContainerManager();
- Set<ContainerReplica> replicas = cm.getContainerReplicas(
- ContainerID.valueOf(c.getContainerData().getContainerID()));
- Assert.assertEquals(1, replicas.size());
- ContainerReplica r = replicas.iterator().next();
- Assert.assertEquals(StorageContainerDatanodeProtocolProtos.
- ContainerReplicaProto.State.UNHEALTHY, r.getState());
+ ContainerReplica openContainerReplica = getContainerReplica(
+ cm, openContainer.getContainerData().getContainerID());
+ ContainerReplica closedContainerReplica = getContainerReplica(
+ cm, closedContainer.getContainerData().getContainerID());
+ //Then both containers are marked unhealthy
+ Assert.assertEquals(State.UNHEALTHY, openContainerReplica.getState());
+ Assert.assertEquals(State.UNHEALTHY, closedContainerReplica.getState());
+ }
+
+ private ContainerReplica getContainerReplica(
+ ContainerManager cm, long containerId) throws ContainerNotFoundException
{
+ Set<ContainerReplica> containerReplicas = cm.getContainerReplicas(
+ ContainerID.valueOf(
+ containerId));
+ Assert.assertEquals(1, containerReplicas.size());
+ return containerReplicas.iterator().next();
+ }
+
+ //ignore the result of the key read because it is expected to fail
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private void tryReadKeyWithMissingChunksDir(
+ OzoneBucket bucket, String keyNameInClosedContainer) throws IOException {
+ try (OzoneInputStream key = bucket.readKey(keyNameInClosedContainer)) {
+ key.read();
+ } catch (StorageContainerException ignored) {
Review Comment:
Let's assert that the exception is thrown here.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java:
##########
@@ -121,111 +119,126 @@ public static void shutdown() throws IOException {
}
}
+ //This test performs 2 separate tests because creating
+ // and running a cluster is expensive.
@Test
- public void testOpenContainerIntegrity() throws Exception {
+ public void testScannersMarkContainerUnhealthy() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
- Instant testStartTime = Instant.now();
-
- String value = "sample value";
+ String value = "sample key value";
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
- for (int i = 0; i < 10; i++) {
- String keyName = UUID.randomUUID().toString();
-
- OzoneOutputStream out = bucket.createKey(keyName,
- value.getBytes(UTF_8).length, RATIS,
- ONE, new HashMap<>());
- out.write(value.getBytes(UTF_8));
- out.close();
- OzoneKey key = bucket.getKey(keyName);
- Assert.assertEquals(keyName, key.getName());
- OzoneInputStream is = bucket.readKey(keyName);
- byte[] fileContent = new byte[value.getBytes(UTF_8).length];
- is.read(fileContent);
- Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
- keyName, RATIS,
- ONE));
- Assert.assertEquals(value, new String(fileContent, UTF_8));
- Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
- Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
- }
-
+ String keyNameInClosedContainer = "keyNameInClosedContainer";
+ OzoneOutputStream key = createKey(volumeName, bucketName,
+ keyNameInClosedContainer);
+ // write data more than 1 chunk
+ int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2);
Review Comment:
Why do we need more than one chunk?
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+ private static volatile OnDemandContainerScanner instance;
+
+ private final ExecutorService scanExecutor;
+ private final ContainerController containerController;
+ private final DataTransferThrottler throttler;
+ private final Canceler canceler;
+ private final ConcurrentHashMap
+ .KeySetView<Long, Boolean> containerRescheduleCheckSet;
+ private final OnDemandScannerMetrics metrics;
+
+ private OnDemandContainerScanner(
+ ContainerScannerConfiguration conf, ContainerController controller) {
+ containerController = controller;
+ throttler = new DataTransferThrottler(
+ conf.getOnDemandBandwidthPerVolume());
+ canceler = new Canceler();
+ metrics = OnDemandScannerMetrics.create();
+ scanExecutor = Executors.newSingleThreadExecutor();
+ containerRescheduleCheckSet = ConcurrentHashMap.newKeySet();
+ }
+
+ public static synchronized void init(
+ ContainerScannerConfiguration conf, ContainerController controller) {
+ if (instance != null) {
+ LOG.warn("Trying to initialize on demand scanner" +
+ " a second time on a datanode.");
+ return;
+ }
+ instance = new OnDemandContainerScanner(conf, controller);
+ }
+
+ public static Optional<Future<?>> scanContainer(Container<?> container) {
+ if (instance == null || !container.shouldScanData()) {
+ return Optional.empty();
+ }
+ Future<?> resultFuture = null;
+ long containerId = container.getContainerData().getContainerID();
+ if (addContainerToScheduledContainers(containerId)) {
+ resultFuture = instance.scanExecutor.submit(() -> {
+ removeContainerFromScheduledContainers(containerId);
+ if (container.shouldScanData()) {
+ performOnDemandScan(container);
+ }
+ });
+ }
+ return Optional.ofNullable(resultFuture);
+ }
+
+ private static boolean addContainerToScheduledContainers(long containerId) {
+ return instance.containerRescheduleCheckSet.add(containerId);
+ }
+
+ private static void removeContainerFromScheduledContainers(
+ long containerId) {
+ instance.containerRescheduleCheckSet.remove(containerId);
+ }
+
+ private static void performOnDemandScan(Container<?> container) {
+ long containerId = container.getContainerData().getContainerID();
+ try {
+ ContainerData containerData = container.getContainerData();
+ logScanStart(containerData);
+ if (container.scanData(instance.throttler, instance.canceler)) {
+ Instant now = Instant.now();
+ logScanCompleted(containerData, now);
+ instance.containerController.updateDataScanTimestamp(containerId, now);
+ } else {
+ instance.containerController.markContainerUnhealthy(containerId);
+ instance.metrics.incNumUnHealthyContainers();
+ }
+ instance.metrics.incNumContainersScanned();
+ } catch (IOException e) {
+ LOG.warn("Unexpected exception while scanning container "
+ + containerId, e);
+ }
+ }
+
+ private static void logScanStart(ContainerData containerData) {
+ if (LOG.isDebugEnabled()) {
+ Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
+ Object lastScanTime = scanTimestamp.map(ts -> "at " +
ts).orElse("never");
+ LOG.debug("Scanning container {}, last scanned {}",
+ containerData.getContainerID(), lastScanTime);
+ }
+ }
+
+ private static void logScanCompleted(
+ ContainerData containerData, Instant timestamp) {
+ LOG.debug("Completed scan of container {} at {}",
+ containerData.getContainerID(), timestamp);
+ }
+
+ public static OnDemandScannerMetrics getMetrics() {
+ return instance.metrics;
+ }
+
+ public static synchronized void shutdown() {
+ if (instance == null) {
+ return;
+ }
+ instance.shutdownScanner();
+ }
+
+ private synchronized void shutdownScanner() {
+ instance = null;
+ metrics.unregister();
+ if (!scanExecutor.isShutdown()) {
+ scanExecutor.shutdown();
+ }
+ try {
+ long timeoutSeconds = 5;
+ if (!scanExecutor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
+ LOG.warn("On demand scanner shut down forcefully after {} seconds",
+ timeoutSeconds);
+ scanExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("On demand scanner interrupted while waiting for shut down.");
+ scanExecutor.shutdownNow();
+ throw new RuntimeException(e);
Review Comment:
I think we should set the interrupt flag and return instead of throwing. See
`AbstractContainerScanner#shutdown` for reference.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+ private static volatile OnDemandContainerScanner instance;
+
+ private final ExecutorService scanExecutor;
+ private final ContainerController containerController;
+ private final DataTransferThrottler throttler;
+ private final Canceler canceler;
+ private final ConcurrentHashMap
+ .KeySetView<Long, Boolean> containerRescheduleCheckSet;
+ private final OnDemandScannerMetrics metrics;
+
+ private OnDemandContainerScanner(
+ ContainerScannerConfiguration conf, ContainerController controller) {
+ containerController = controller;
+ throttler = new DataTransferThrottler(
+ conf.getOnDemandBandwidthPerVolume());
+ canceler = new Canceler();
+ metrics = OnDemandScannerMetrics.create();
+ scanExecutor = Executors.newSingleThreadExecutor();
+ containerRescheduleCheckSet = ConcurrentHashMap.newKeySet();
+ }
+
+ public static synchronized void init(
+ ContainerScannerConfiguration conf, ContainerController controller) {
+ if (instance != null) {
+ LOG.warn("Trying to initialize on demand scanner" +
+ " a second time on a datanode.");
+ return;
+ }
+ instance = new OnDemandContainerScanner(conf, controller);
+ }
+
+ public static Optional<Future<?>> scanContainer(Container<?> container) {
+ if (instance == null || !container.shouldScanData()) {
+ return Optional.empty();
+ }
+ Future<?> resultFuture = null;
+ long containerId = container.getContainerData().getContainerID();
+ if (addContainerToScheduledContainers(containerId)) {
+ resultFuture = instance.scanExecutor.submit(() -> {
+ removeContainerFromScheduledContainers(containerId);
+ if (container.shouldScanData()) {
+ performOnDemandScan(container);
+ }
+ });
+ }
+ return Optional.ofNullable(resultFuture);
+ }
+
+ private static boolean addContainerToScheduledContainers(long containerId) {
+ return instance.containerRescheduleCheckSet.add(containerId);
+ }
+
+ private static void removeContainerFromScheduledContainers(
+ long containerId) {
+ instance.containerRescheduleCheckSet.remove(containerId);
+ }
+
+ private static void performOnDemandScan(Container<?> container) {
+ long containerId = container.getContainerData().getContainerID();
+ try {
+ ContainerData containerData = container.getContainerData();
+ logScanStart(containerData);
+ if (container.scanData(instance.throttler, instance.canceler)) {
+ Instant now = Instant.now();
+ logScanCompleted(containerData, now);
+ instance.containerController.updateDataScanTimestamp(containerId, now);
+ } else {
+ instance.containerController.markContainerUnhealthy(containerId);
+ instance.metrics.incNumUnHealthyContainers();
+ }
+ instance.metrics.incNumContainersScanned();
+ } catch (IOException e) {
+ LOG.warn("Unexpected exception while scanning container "
+ + containerId, e);
+ }
+ }
+
+ private static void logScanStart(ContainerData containerData) {
+ if (LOG.isDebugEnabled()) {
+ Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
+ Object lastScanTime = scanTimestamp.map(ts -> "at " +
ts).orElse("never");
+ LOG.debug("Scanning container {}, last scanned {}",
+ containerData.getContainerID(), lastScanTime);
+ }
+ }
+
+ private static void logScanCompleted(
+ ContainerData containerData, Instant timestamp) {
+ LOG.debug("Completed scan of container {} at {}",
+ containerData.getContainerID(), timestamp);
+ }
+
+ public static OnDemandScannerMetrics getMetrics() {
+ return instance.metrics;
+ }
+
+ public static synchronized void shutdown() {
+ if (instance == null) {
+ return;
+ }
+ instance.shutdownScanner();
+ }
+
+ private synchronized void shutdownScanner() {
Review Comment:
This should probably invoke the canceler like the shutdown of the background
data scanner does.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]