http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java deleted file mode 100644 index a599f72..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.keyvalue; - -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; - -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.compress.compressors.CompressorException; -import org.apache.commons.compress.compressors.CompressorInputStream; -import org.apache.commons.compress.compressors.CompressorStreamFactory; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * Test the tar/untar for a given container. - */ -public class TestTarContainerPacker { - - private static final String TEST_DB_FILE_NAME = "test1"; - - private static final String TEST_DB_FILE_CONTENT = "test1"; - - private static final String TEST_CHUNK_FILE_NAME = "chunk1"; - - private static final String TEST_CHUNK_FILE_CONTENT = "This is a chunk"; - - private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor"; - - private ContainerPacker packer = new TarContainerPacker(); - - private static final Path SOURCE_CONTAINER_ROOT = - Paths.get("target/test/data/packer-source-dir"); - - private static final Path DEST_CONTAINER_ROOT = - Paths.get("target/test/data/packer-dest-dir"); - - @BeforeClass - public static void init() throws IOException { - initDir(SOURCE_CONTAINER_ROOT); - initDir(DEST_CONTAINER_ROOT); - } - - private static void initDir(Path path) throws IOException { - if (path.toFile().exists()) { - FileUtils.deleteDirectory(path.toFile()); - } - path.toFile().mkdirs(); - } - - private KeyValueContainerData createContainer(long id, Path dir, - OzoneConfiguration conf) throws IOException { - - Path containerDir = dir.resolve("container" + id); - Path dbDir = containerDir.resolve("db"); - Path dataDir = containerDir.resolve("data"); - Files.createDirectories(dbDir); - Files.createDirectories(dataDir); - - KeyValueContainerData containerData = new KeyValueContainerData(id, -1); - containerData.setChunksPath(dataDir.toString()); - containerData.setMetadataPath(dbDir.getParent().toString()); - containerData.setDbFile(dbDir.toFile()); - - - return containerData; - } - - @Test - public void pack() throws IOException, CompressorException { - - //GIVEN - OzoneConfiguration conf = new OzoneConfiguration(); - - KeyValueContainerData sourceContainerData = - createContainer(1L, SOURCE_CONTAINER_ROOT, conf); - - KeyValueContainer sourceContainer = - new KeyValueContainer(sourceContainerData, conf); - - //sample db file in the metadata directory - try (FileWriter writer = new FileWriter( - sourceContainerData.getDbFile().toPath() - .resolve(TEST_DB_FILE_NAME) - .toFile())) { - IOUtils.write(TEST_DB_FILE_CONTENT, writer); - } - - //sample chunk file in the chunk directory - try (FileWriter writer = new FileWriter( - Paths.get(sourceContainerData.getChunksPath()) - .resolve(TEST_CHUNK_FILE_NAME) - .toFile())) { - IOUtils.write(TEST_CHUNK_FILE_CONTENT, writer); - } - - //sample container descriptor file - try (FileWriter writer = new FileWriter( - sourceContainer.getContainerFile())) { - IOUtils.write(TEST_DESCRIPTOR_FILE_CONTENT, writer); - } - - Path targetFile = - SOURCE_CONTAINER_ROOT.getParent().resolve("container.tar.gz"); - - //WHEN: pack it - try (FileOutputStream output = new FileOutputStream(targetFile.toFile())) { - packer.pack(sourceContainer, output); - } - - //THEN: check the result - try (FileInputStream input = new FileInputStream(targetFile.toFile())) { - CompressorInputStream uncompressed = new CompressorStreamFactory() - .createCompressorInputStream(CompressorStreamFactory.GZIP, input); - TarArchiveInputStream tarStream = new TarArchiveInputStream(uncompressed); - - TarArchiveEntry entry; - Map<String, TarArchiveEntry> entries = new HashMap<>(); - while ((entry = tarStream.getNextTarEntry()) != null) { - entries.put(entry.getName(), entry); - } - - Assert.assertTrue( - entries.containsKey("container.yaml")); - - } - - //read the container descriptor only - try (FileInputStream input = new FileInputStream(targetFile.toFile())) { - String containerYaml = new String(packer.unpackContainerDescriptor(input), - Charset.forName(StandardCharsets.UTF_8.name())); - Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, containerYaml); - } - - KeyValueContainerData destinationContainerData = - createContainer(2L, DEST_CONTAINER_ROOT, conf); - - KeyValueContainer destinationContainer = - new KeyValueContainer(destinationContainerData, conf); - - String descriptor = ""; - - //unpackContainerData - try (FileInputStream input = new FileInputStream(targetFile.toFile())) { - descriptor = - new String(packer.unpackContainerData(destinationContainer, input), - Charset.forName(StandardCharsets.UTF_8.name())); - } - - assertExampleMetadataDbIsGood( - destinationContainerData.getDbFile().toPath()); - assertExampleChunkFileIsGood( - Paths.get(destinationContainerData.getChunksPath())); - Assert.assertFalse( - "Descriptor file should not been exctarcted by the " - + "unpackContainerData Call", - destinationContainer.getContainerFile().exists()); - Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, descriptor); - - } - - - private void assertExampleMetadataDbIsGood(Path dbPath) - throws IOException { - - Path dbFile = dbPath.resolve(TEST_DB_FILE_NAME); - - Assert.assertTrue( - "example DB file is missing after pack/unpackContainerData: " + dbFile, - Files.exists(dbFile)); - - try (FileInputStream testFile = new FileInputStream(dbFile.toFile())) { - List<String> strings = IOUtils - .readLines(testFile, Charset.forName(StandardCharsets.UTF_8.name())); - Assert.assertEquals(1, strings.size()); - Assert.assertEquals(TEST_DB_FILE_CONTENT, strings.get(0)); - } - } - - private void assertExampleChunkFileIsGood(Path chunkDirPath) - throws IOException { - - Path chunkFile = chunkDirPath.resolve(TEST_CHUNK_FILE_NAME); - - Assert.assertTrue( - "example chunk file is missing after pack/unpackContainerData: " - + chunkFile, - Files.exists(chunkFile)); - - try (FileInputStream testFile = new FileInputStream(chunkFile.toFile())) { - List<String> strings = IOUtils - .readLines(testFile, Charset.forName(StandardCharsets.UTF_8.name())); - Assert.assertEquals(1, strings.size()); - Assert.assertEquals(TEST_CHUNK_FILE_CONTENT, strings.get(0)); - } - } - -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java deleted file mode 100644 index fea126b..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.ozoneimpl; - - -import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; -import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import java.util.Random; -import java.util.UUID; - - -import static org.junit.Assert.assertEquals; - -/** - * This class is used to test OzoneContainer. - */ -public class TestOzoneContainer { - - @Rule - public TemporaryFolder folder = new TemporaryFolder(); - - - private OzoneConfiguration conf; - private String scmId = UUID.randomUUID().toString(); - private VolumeSet volumeSet; - private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy; - private KeyValueContainerData keyValueContainerData; - private KeyValueContainer keyValueContainer; - private final DatanodeDetails datanodeDetails = createDatanodeDetails(); - - @Before - public void setUp() throws Exception { - conf = new OzoneConfiguration(); - conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, folder.getRoot() - .getAbsolutePath()); - conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, - folder.newFolder().getAbsolutePath()); - } - - @Test - public void testBuildContainerMap() throws Exception { - volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); - volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy(); - - // Format the volumes - for (HddsVolume volume : volumeSet.getVolumesList()) { - volume.format(UUID.randomUUID().toString()); - } - - // Add containers to disk - for (int i=0; i<10; i++) { - keyValueContainerData = new KeyValueContainerData(i, - (long) StorageUnit.GB.toBytes(1)); - keyValueContainer = new KeyValueContainer( - keyValueContainerData, conf); - keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); - } - - // When OzoneContainer is started, the containers from disk should be - // loaded into the containerSet. - OzoneContainer ozoneContainer = new - OzoneContainer(datanodeDetails, conf, null); - ContainerSet containerset = ozoneContainer.getContainerSet(); - assertEquals(10, containerset.containerCount()); - } - - - private DatanodeDetails createDatanodeDetails() { - Random random = new Random(); - String ipAddress = - random.nextInt(256) + "." + random.nextInt(256) + "." + random - .nextInt(256) + "." + random.nextInt(256); - - String uuid = UUID.randomUUID().toString(); - String hostName = uuid; - DatanodeDetails.Port containerPort = DatanodeDetails.newPort( - DatanodeDetails.Port.Name.STANDALONE, 0); - DatanodeDetails.Port ratisPort = DatanodeDetails.newPort( - DatanodeDetails.Port.Name.RATIS, 0); - DatanodeDetails.Port restPort = DatanodeDetails.newPort( - DatanodeDetails.Port.Name.REST, 0); - DatanodeDetails.Builder builder = DatanodeDetails.newBuilder(); - builder.setUuid(uuid) - .setHostName("localhost") - .setIpAddress(ipAddress) - .addPort(containerPort) - .addPort(ratisPort) - .addPort(restPort); - return builder.build(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java deleted file mode 100644 index d433319..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.replication; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; - -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - -/** - * Test the replication supervisor. - */ -public class TestReplicationSupervisor { - - private OzoneConfiguration conf = new OzoneConfiguration(); - - @Test - public void normal() { - //GIVEN - ContainerSet set = new ContainerSet(); - - FakeReplicator replicator = new FakeReplicator(set); - ReplicationSupervisor supervisor = - new ReplicationSupervisor(set, replicator, 5); - - List<DatanodeDetails> datanodes = IntStream.range(1, 3) - .mapToObj(v -> Mockito.mock(DatanodeDetails.class)) - .collect(Collectors.toList()); - - try { - supervisor.start(); - //WHEN - supervisor.addTask(new ReplicationTask(1L, datanodes)); - supervisor.addTask(new ReplicationTask(1L, datanodes)); - supervisor.addTask(new ReplicationTask(1L, datanodes)); - supervisor.addTask(new ReplicationTask(2L, datanodes)); - supervisor.addTask(new ReplicationTask(2L, datanodes)); - supervisor.addTask(new ReplicationTask(3L, datanodes)); - try { - Thread.sleep(300); - } catch (InterruptedException e) { - e.printStackTrace(); - } - //THEN - System.out.println(replicator.replicated.get(0)); - - Assert - .assertEquals(3, replicator.replicated.size()); - - } finally { - supervisor.stop(); - } - } - - @Test - public void duplicateMessageAfterAWhile() throws InterruptedException { - //GIVEN - ContainerSet set = new ContainerSet(); - - FakeReplicator replicator = new FakeReplicator(set); - ReplicationSupervisor supervisor = - new ReplicationSupervisor(set, replicator, 2); - - List<DatanodeDetails> datanodes = IntStream.range(1, 3) - .mapToObj(v -> Mockito.mock(DatanodeDetails.class)) - .collect(Collectors.toList()); - - try { - supervisor.start(); - //WHEN - supervisor.addTask(new ReplicationTask(1L, datanodes)); - Thread.sleep(400); - supervisor.addTask(new ReplicationTask(1L, datanodes)); - Thread.sleep(300); - - //THEN - System.out.println(replicator.replicated.get(0)); - - Assert - .assertEquals(1, replicator.replicated.size()); - - //the last item is still in the queue as we cleanup the queue during the - // selection - Assert.assertEquals(1, supervisor.getQueueSize()); - - } finally { - supervisor.stop(); - } - } - - private class FakeReplicator implements ContainerReplicator { - - private List<ReplicationTask> replicated = new ArrayList<>(); - - private ContainerSet containerSet; - - FakeReplicator(ContainerSet set) { - this.containerSet = set; - } - - @Override - public void replicate(ReplicationTask task) { - KeyValueContainerData kvcd = - new KeyValueContainerData(task.getContainerId(), 100L); - KeyValueContainer kvc = - new KeyValueContainer(kvcd, conf); - try { - //download is slow - Thread.sleep(100); - replicated.add(task); - containerSet.addContainer(kvc); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java deleted file mode 100644 index 5c905e0..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -/** - * Tests for the container replication. - */ -package org.apache.hadoop.ozone.container.replication; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java deleted file mode 100644 index 115b5e2..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.testutils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.keyvalue.statemachine.background - .BlockDeletingService; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * A test class implementation for {@link BlockDeletingService}. - */ -public class BlockDeletingServiceTestImpl - extends BlockDeletingService { - - // the service timeout - private static final int SERVICE_TIMEOUT_IN_MILLISECONDS = 0; - - // tests only - private CountDownLatch latch; - private Thread testingThread; - private AtomicInteger numOfProcessed = new AtomicInteger(0); - - public BlockDeletingServiceTestImpl(ContainerSet containerSet, - int serviceInterval, Configuration conf) { - super(containerSet, serviceInterval, SERVICE_TIMEOUT_IN_MILLISECONDS, - TimeUnit.MILLISECONDS, conf); - } - - @VisibleForTesting - public void runDeletingTasks() { - if (latch.getCount() > 0) { - this.latch.countDown(); - } else { - throw new IllegalStateException("Count already reaches zero"); - } - } - - @VisibleForTesting - public boolean isStarted() { - return latch != null && testingThread.isAlive(); - } - - public int getTimesOfProcessed() { - return numOfProcessed.get(); - } - - // Override the implementation to start a single on-call control thread. - @Override public void start() { - PeriodicalTask svc = new PeriodicalTask(); - // In test mode, relies on a latch countdown to runDeletingTasks tasks. - Runnable r = () -> { - while (true) { - latch = new CountDownLatch(1); - try { - latch.await(); - } catch (InterruptedException e) { - break; - } - Future<?> future = this.getExecutorService().submit(svc); - try { - // for tests, we only wait for 3s for completion - future.get(3, TimeUnit.SECONDS); - numOfProcessed.incrementAndGet(); - } catch (Exception e) { - return; - } - } - }; - - testingThread = new ThreadFactoryBuilder() - .setDaemon(true) - .build() - .newThread(r); - testingThread.start(); - } - - @Override - public void shutdown() { - testingThread.interrupt(); - super.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/package-info.java deleted file mode 100644 index 4e8a90b..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.testutils; -// Helper classes for ozone and container tests. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/test/resources/additionalfields.container ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/resources/additionalfields.container b/hadoop-hdds/container-service/src/test/resources/additionalfields.container deleted file mode 100644 index fff5304..0000000 --- a/hadoop-hdds/container-service/src/test/resources/additionalfields.container +++ /dev/null @@ -1,12 +0,0 @@ -!<KeyValueContainerData> -containerDBType: RocksDB -chunksPath: /hdds/current/aed-fg4-hji-jkl/containerDir0/1 -containerID: 9223372036854775807 -containerType: KeyValueContainer -metadataPath: /hdds/current/aed-fg4-hji-jkl/containerDir0/1 -layOutVersion: 1 -maxSize: 5368709120 -metadata: {OWNER: ozone, VOLUME: hdfs} -state: CLOSED -aclEnabled: true -checksum: c5b5373b8755c4e7199478dcaded9d996f9aca089704e08950259cdb0f290680 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/test/resources/incorrect.checksum.container ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/resources/incorrect.checksum.container b/hadoop-hdds/container-service/src/test/resources/incorrect.checksum.container deleted file mode 100644 index d06ba57..0000000 --- a/hadoop-hdds/container-service/src/test/resources/incorrect.checksum.container +++ /dev/null @@ -1,11 +0,0 @@ -!<KeyValueContainerData> -containerDBType: RocksDB -chunksPath: /hdds/current/aed-fg4-hji-jkl/containerdir0/1 -containerID: 9223372036854775807 -containerType: KeyValueContainer -metadataPath: /hdds/current/aed-fg4-hji-jkl/containerdir0/1 -layOutVersion: 1 -maxSize: 5368709120 -metadata: {OWNER: ozone, VOLUME: hdfs} -state: OPEN -checksum: 08bc9d390f9183aeed3cf33c789e2a07310bba60f3cf55941caccc939db8670f \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/test/resources/incorrect.container ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/resources/incorrect.container b/hadoop-hdds/container-service/src/test/resources/incorrect.container deleted file mode 100644 index 0053ab2..0000000 --- a/hadoop-hdds/container-service/src/test/resources/incorrect.container +++ /dev/null @@ -1,11 +0,0 @@ -!<KeyValueContainerData> -containerDBType: RocksDB -chunksPath: /hdds/current/aed-fg4-hji-jkl/containerDir0/1 -containerID: 9223372036854775807 -containerType: KeyValueContainer -metadataPath: /hdds/current/aed-fg4-hji-jkl/containerDir0/1 -layOutVersion: 1 -maxSize: 5368709120 -metadata: {OWNER: ozone, VOLUME: hdfs} -state: INVALID -checksum: 08bc9d390f9183aeed3cf33c789e2a07310bba60f3cf55941caccc939db8670f \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/resources/log4j.properties b/hadoop-hdds/container-service/src/test/resources/log4j.properties deleted file mode 100644 index bb5cbe5..0000000 --- a/hadoop-hdds/container-service/src/test/resources/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# log4j configuration used during build and unit tests - -log4j.rootLogger=INFO,stdout -log4j.threshold=ALL -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/README.md ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/README.md b/hadoop-hdds/framework/README.md deleted file mode 100644 index 0eda3f5..0000000 --- a/hadoop-hdds/framework/README.md +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -# Server framework for HDDS/Ozone - -This project contains generic utilities and resources for all the HDDS/Ozone -server-side components. - -The project is shared between the server/service projects but not with the -client packages. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml deleted file mode 100644 index 511f321..0000000 --- a/hadoop-hdds/framework/pom.xml +++ /dev/null @@ -1,69 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. See accompanying LICENSE file. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 -http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdds</artifactId> - <version>0.3.0-SNAPSHOT</version> - </parent> - <artifactId>hadoop-hdds-server-framework</artifactId> - <version>0.3.0-SNAPSHOT</version> - <description>Apache Hadoop Distributed Data Store Server Framework</description> - <name>Apache Hadoop HDDS Server Framework</name> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdds-common</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <id>copy web resources</id> - <phase>compile</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <target> - <copy toDir="${project.build.directory}/classes/webapps/static"> - <fileset - dir="${basedir}/../../hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/static"> - </fileset> - </copy> - </target> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/BaseHttpServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/BaseHttpServer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/BaseHttpServer.java deleted file mode 100644 index da5d8da..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/BaseHttpServer.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.server; - -import com.google.common.base.Optional; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdds.conf.HddsConfServlet; -import org.apache.hadoop.http.HttpConfig; -import org.apache.hadoop.http.HttpServer2; -import org.apache.hadoop.net.NetUtils; -import org.eclipse.jetty.webapp.WebAppContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.http.HttpServlet; -import java.io.IOException; -import java.net.InetSocketAddress; - -import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys; -import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys; - -/** - * Base class for HTTP server of the Ozone related components. - */ -public abstract class BaseHttpServer { - - private static final Logger LOG = - LoggerFactory.getLogger(BaseHttpServer.class); - - private HttpServer2 httpServer; - private final Configuration conf; - - private InetSocketAddress httpAddress; - private InetSocketAddress httpsAddress; - - private HttpConfig.Policy policy; - - private String name; - - public BaseHttpServer(Configuration conf, String name) throws IOException { - this.name = name; - this.conf = conf; - if (isEnabled()) { - policy = DFSUtil.getHttpPolicy(conf); - if (policy.isHttpEnabled()) { - this.httpAddress = getHttpBindAddress(); - } - if (policy.isHttpsEnabled()) { - this.httpsAddress = getHttpsBindAddress(); - } - HttpServer2.Builder builder = null; - builder = DFSUtil.httpServerTemplateForNNAndJN(conf, this.httpAddress, - this.httpsAddress, name, getSpnegoPrincipal(), getKeytabFile()); - - final boolean xFrameEnabled = conf.getBoolean( - DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED, - DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED_DEFAULT); - - final String xFrameOptionValue = conf.getTrimmed( - DFSConfigKeys.DFS_XFRAME_OPTION_VALUE, - DFSConfigKeys.DFS_XFRAME_OPTION_VALUE_DEFAULT); - - builder.configureXFrame(xFrameEnabled).setXFrameOption(xFrameOptionValue); - - httpServer = builder.build(); - httpServer.addServlet("conf", "/conf", HddsConfServlet.class); - - } - - } - - /** - * Add a servlet to BaseHttpServer. - * - * @param servletName The name of the servlet - * @param pathSpec The path spec for the servlet - * @param clazz The servlet class - */ - protected void addServlet(String servletName, String pathSpec, - Class<? extends HttpServlet> clazz) { - httpServer.addServlet(servletName, pathSpec, clazz); - } - - /** - * Returns the WebAppContext associated with this HttpServer. - * - * @return WebAppContext - */ - protected WebAppContext getWebAppContext() { - return httpServer.getWebAppContext(); - } - - protected InetSocketAddress getBindAddress(String bindHostKey, - String addressKey, String bindHostDefault, int bindPortdefault) { - final Optional<String> bindHost = - getHostNameFromConfigKeys(conf, bindHostKey); - - final Optional<Integer> addressPort = - getPortNumberFromConfigKeys(conf, addressKey); - - final Optional<String> addresHost = - getHostNameFromConfigKeys(conf, addressKey); - - String hostName = bindHost.or(addresHost).or(bindHostDefault); - - return NetUtils.createSocketAddr( - hostName + ":" + addressPort.or(bindPortdefault)); - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the HTTPS web interface. - * - * @return Target InetSocketAddress for the Ozone HTTPS endpoint. - */ - public InetSocketAddress getHttpsBindAddress() { - return getBindAddress(getHttpsBindHostKey(), getHttpsAddressKey(), - getBindHostDefault(), getHttpsBindPortDefault()); - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the HTTP web interface. - * <p> - * * @return Target InetSocketAddress for the Ozone HTTP endpoint. - */ - public InetSocketAddress getHttpBindAddress() { - return getBindAddress(getHttpBindHostKey(), getHttpAddressKey(), - getBindHostDefault(), getHttpBindPortDefault()); - - } - - public void start() throws IOException { - if (httpServer != null && isEnabled()) { - httpServer.start(); - updateConnectorAddress(); - } - - } - - private boolean isEnabled() { - return conf.getBoolean(getEnabledKey(), true); - } - - public void stop() throws Exception { - if (httpServer != null) { - httpServer.stop(); - } - } - - /** - * Update the configured listen address based on the real port - * <p> - * (eg. replace :0 with real port) - */ - public void updateConnectorAddress() { - int connIdx = 0; - if (policy.isHttpEnabled()) { - httpAddress = httpServer.getConnectorAddress(connIdx++); - String realAddress = NetUtils.getHostPortString(httpAddress); - conf.set(getHttpAddressKey(), realAddress); - LOG.info( - String.format("HTTP server of %s is listening at http://%s", - name.toUpperCase(), realAddress)); - } - - if (policy.isHttpsEnabled()) { - httpsAddress = httpServer.getConnectorAddress(connIdx); - String realAddress = NetUtils.getHostPortString(httpsAddress); - conf.set(getHttpsAddressKey(), realAddress); - LOG.info( - String.format("HTTP server of %s is listening at https://%s", - name.toUpperCase(), realAddress)); - } - } - - public InetSocketAddress getHttpAddress() { - return httpAddress; - } - - public InetSocketAddress getHttpsAddress() { - return httpsAddress; - } - - protected abstract String getHttpAddressKey(); - - protected abstract String getHttpsAddressKey(); - - protected abstract String getHttpBindHostKey(); - - protected abstract String getHttpsBindHostKey(); - - protected abstract String getBindHostDefault(); - - protected abstract int getHttpBindPortDefault(); - - protected abstract int getHttpsBindPortDefault(); - - protected abstract String getKeytabFile(); - - protected abstract String getSpnegoPrincipal(); - - protected abstract String getEnabledKey(); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java deleted file mode 100644 index c6d85d8..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.server; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.http.client.methods.HttpRequestBase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.net.InetSocketAddress; - -/** - * Generic utilities for all HDDS/Ozone servers. - */ -public final class ServerUtils { - - private static final Logger LOG = LoggerFactory.getLogger( - ServerUtils.class); - - private ServerUtils() { - } - - /** - * Checks that a given value is with a range. - * - * For example, sanitizeUserArgs(17, 3, 5, 10) - * ensures that 17 is greater/equal than 3 * 5 and less/equal to 3 * 10. - * - * @param valueTocheck - value to check - * @param baseValue - the base value that is being used. - * @param minFactor - range min - a 2 here makes us ensure that value - * valueTocheck is at least twice the baseValue. - * @param maxFactor - range max - * @return long - */ - public static long sanitizeUserArgs(long valueTocheck, long baseValue, - long minFactor, long maxFactor) - throws IllegalArgumentException { - if ((valueTocheck >= (baseValue * minFactor)) && - (valueTocheck <= (baseValue * maxFactor))) { - return valueTocheck; - } - String errMsg = String.format("%d is not within min = %d or max = " + - "%d", valueTocheck, baseValue * minFactor, baseValue * maxFactor); - throw new IllegalArgumentException(errMsg); - } - - - /** - * After starting an RPC server, updates configuration with the actual - * listening address of that server. The listening address may be different - * from the configured address if, for example, the configured address uses - * port 0 to request use of an ephemeral port. - * - * @param conf configuration to update - * @param rpcAddressKey configuration key for RPC server address - * @param addr configured address - * @param rpcServer started RPC server. - */ - public static InetSocketAddress updateRPCListenAddress( - OzoneConfiguration conf, String rpcAddressKey, - InetSocketAddress addr, RPC.Server rpcServer) { - return updateListenAddress(conf, rpcAddressKey, addr, - rpcServer.getListenerAddress()); - } - - - /** - * After starting an server, updates configuration with the actual - * listening address of that server. The listening address may be different - * from the configured address if, for example, the configured address uses - * port 0 to request use of an ephemeral port. - * - * @param conf configuration to update - * @param addressKey configuration key for RPC server address - * @param addr configured address - * @param listenAddr the real listening address. - */ - public static InetSocketAddress updateListenAddress(OzoneConfiguration conf, - String addressKey, InetSocketAddress addr, InetSocketAddress listenAddr) { - InetSocketAddress updatedAddr = new InetSocketAddress(addr.getHostString(), - listenAddr.getPort()); - conf.set(addressKey, - addr.getHostString() + ":" + listenAddr.getPort()); - return updatedAddr; - } - - - /** - * Releases a http connection if the request is not null. - * @param request - */ - public static void releaseConnection(HttpRequestBase request) { - if (request != null) { - request.releaseConnection(); - } - } - - - /** - * Checks and creates Ozone Metadir Path if it does not exist. - * - * @param conf - Configuration - * - * @return File MetaDir - */ - public static File getOzoneMetaDirPath(Configuration conf) { - String metaDirPath = conf.getTrimmed(OzoneConfigKeys - .OZONE_METADATA_DIRS); - Preconditions.checkNotNull(metaDirPath); - File dirPath = new File(metaDirPath); - if (!dirPath.exists() && !dirPath.mkdirs()) { - throw new IllegalArgumentException("Unable to create paths. Path: " + - dirPath); - } - return dirPath; - } - - public static void setOzoneMetaDirPath(OzoneConfiguration conf, - String path) { - conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, path); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServiceRuntimeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServiceRuntimeInfo.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServiceRuntimeInfo.java deleted file mode 100644 index bcd75f3..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServiceRuntimeInfo.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.server; - -/** - * Common runtime information for any service components. - * - * Note: it's intentional to not use MXBean or MBean as a suffix of the name. - * - * Most of the services extends the ServiceRuntimeInfoImpl class and also - * implements a specific MXBean interface which extends this interface. - * - * This inheritance from multiple path could confuse the jmx system and - * some jmx properties could be disappeared. - * - * The solution is to always extend this interface and use the jmx naming - * convention in the new interface.. - */ -public interface ServiceRuntimeInfo { - - /** - * Gets the version of Hadoop. - * - * @return the version - */ - String getVersion(); - - /** - * Get the version of software running on the Namenode. - * - * @return a string representing the version - */ - String getSoftwareVersion(); - - /** - * Get the compilation information which contains date, user and branch. - * - * @return the compilation information, as a JSON string. - */ - String getCompileInfo(); - - /** - * Gets the NN start time in milliseconds. - * - * @return the NN start time in msec - */ - long getStartedTimeInMillis(); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServiceRuntimeInfoImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServiceRuntimeInfoImpl.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServiceRuntimeInfoImpl.java deleted file mode 100644 index 36d6b64..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServiceRuntimeInfoImpl.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.server; - -import org.apache.hadoop.util.VersionInfo; - -/** - * Helper base class to report the standard version and runtime information. - * - */ -public class ServiceRuntimeInfoImpl implements ServiceRuntimeInfo { - - private long startedTimeInMillis; - - @Override - public String getVersion() { - return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision(); - } - - @Override - public String getSoftwareVersion() { - return VersionInfo.getVersion(); - } - - @Override - public String getCompileInfo() { - return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " - + VersionInfo.getBranch(); - } - - @Override - public long getStartedTimeInMillis() { - return startedTimeInMillis; - } - - public void setStartTime() { - startedTimeInMillis = System.currentTimeMillis(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java deleted file mode 100644 index 810c8b3..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.server.events; - -/** - * Identifier of an async event. - * - * @param <PAYLOAD> THe message payload type of this event. - */ -public interface Event<PAYLOAD> { - - /** - * The type of the event payload. Payload contains all the required data - * to process the event. - * - */ - Class<PAYLOAD> getPayloadType(); - - /** - * The human readable name of the event. - * - * Used for display in thread names - * and monitoring. - * - */ - String getName(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java deleted file mode 100644 index 4257839..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.server.events; - -/** - * Executors defined the way how an EventHandler should be called. - * <p> - * Executors are used only by the EventQueue and they do the thread separation - * between the caller and the EventHandler. - * <p> - * Executors should guarantee that only one thread is executing one - * EventHandler at the same time. - * - * @param <PAYLOAD> the payload type of the event. - */ -public interface EventExecutor<PAYLOAD> extends AutoCloseable { - - /** - * Process an event payload. - * - * @param handler the handler to process the payload - * @param eventPayload to be processed. - * @param publisher to send response/other message forward to the chain. - */ - void onMessage(EventHandler<PAYLOAD> handler, - PAYLOAD eventPayload, - EventPublisher - publisher); - - /** - * Return the number of the failed events. - */ - long failedEvents(); - - - /** - * Return the number of the processed events. - */ - long successfulEvents(); - - /** - * Return the number of the not-yet processed events. - */ - long queuedEvents(); - - /** - * The human readable name for the event executor. - * <p> - * Used in monitoring and logging. - * - */ - String getName(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java deleted file mode 100644 index f40fc9e..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.server.events; - -/** - * Processor to react on an event. - * - * EventExecutors should guarantee that the implementations are called only - * from one thread. - * - * @param <PAYLOAD> - */ -@FunctionalInterface -public interface EventHandler<PAYLOAD> { - - void onMessage(PAYLOAD payload, EventPublisher publisher); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java deleted file mode 100644 index a47fb57..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.server.events; - -/** - * Client interface to send a new event. - */ -public interface EventPublisher { - - <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void - fireEvent(EVENT_TYPE event, PAYLOAD payload); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java deleted file mode 100644 index 9aeab7b..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ /dev/null @@ -1,251 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.server.events; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Time; - -import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * Simple async event processing utility. - * <p> - * Event queue handles a collection of event handlers and routes the incoming - * events to one (or more) event handler. - */ -public class EventQueue implements EventPublisher, AutoCloseable { - - private static final Logger LOG = - LoggerFactory.getLogger(EventQueue.class); - - private static final String EXECUTOR_NAME_SEPARATOR = "For"; - - private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors = - new HashMap<>(); - - private final AtomicLong queuedCount = new AtomicLong(0); - - private final AtomicLong eventCount = new AtomicLong(0); - - private boolean isRunning = true; - - public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler( - EVENT_TYPE event, EventHandler<PAYLOAD> handler) { - this.addHandler(event, handler, generateHandlerName(handler)); - } - - /** - * Add new handler to the event queue. - * <p> - * By default a separated single thread executor will be dedicated to - * deliver the events to the registered event handler. - * - * @param event Triggering event. - * @param handler Handler of event (will be called from a separated - * thread) - * @param handlerName The name of handler (should be unique together with - * the event name) - * @param <PAYLOAD> The type of the event payload. - * @param <EVENT_TYPE> The type of the event identifier. - */ - public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler( - EVENT_TYPE event, EventHandler<PAYLOAD> handler, String handlerName) { - validateEvent(event); - Preconditions.checkNotNull(handler, "Handler name should not be null."); - String executorName = - StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR - + handlerName; - this.addHandler(event, new SingleThreadExecutor<>(executorName), handler); - } - - private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) { - Preconditions - .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR), - "Event name should not contain " + EXECUTOR_NAME_SEPARATOR - + " string."); - - } - - private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) { - if (!"".equals(handler.getClass().getSimpleName())) { - return handler.getClass().getSimpleName(); - } else { - return handler.getClass().getName(); - } - } - - /** - * Add event handler with custom executor. - * - * @param event Triggering event. - * @param executor The executor imlementation to deliver events from a - * separated threads. Please keep in your mind that - * registering metrics is the responsibility of the - * caller. - * @param handler Handler of event (will be called from a separated - * thread) - * @param <PAYLOAD> The type of the event payload. - * @param <EVENT_TYPE> The type of the event identifier. - */ - public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler( - EVENT_TYPE event, EventExecutor<PAYLOAD> executor, - EventHandler<PAYLOAD> handler) { - if (!isRunning) { - LOG.warn("Not adding handler for {}, EventQueue is not running", event); - return; - } - validateEvent(event); - executors.putIfAbsent(event, new HashMap<>()); - executors.get(event).putIfAbsent(executor, new ArrayList<>()); - - executors.get(event).get(executor).add(handler); - } - - - - /** - * Route an event with payload to the right listener(s). - * - * @param event The event identifier - * @param payload The payload of the event. - * @throws IllegalArgumentException If there is no EventHandler for - * the specific event. - */ - public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent( - EVENT_TYPE event, PAYLOAD payload) { - - if (!isRunning) { - LOG.warn("Processing of {} is skipped, EventQueue is not running", event); - return; - } - - Map<EventExecutor, List<EventHandler>> eventExecutorListMap = - this.executors.get(event); - - eventCount.incrementAndGet(); - if (eventExecutorListMap != null) { - - for (Map.Entry<EventExecutor, List<EventHandler>> executorAndHandlers : - eventExecutorListMap.entrySet()) { - - for (EventHandler handler : executorAndHandlers.getValue()) { - queuedCount.incrementAndGet(); - if (LOG.isDebugEnabled()) { - LOG.debug("Delivering event {} to executor/handler {}: {}", - event.getName(), - executorAndHandlers.getKey().getName(), - payload); - } - executorAndHandlers.getKey() - .onMessage(handler, payload, this); - - } - } - - } else { - LOG.warn("No event handler registered for event " + event); - } - - } - - /** - * This is just for unit testing, don't use it for production code. - * <p> - * It waits for all messages to be processed. If one event handler invokes an - * other one, the later one also should be finished. - * <p> - * Long counter overflow is not handled, therefore it's safe only for unit - * testing. - * <p> - * This method is just eventually consistent. In some cases it could return - * even if there are new messages in some of the handler. But in a simple - * case (one message) it will return only if the message is processed and - * all the dependent messages (messages which are sent by current handlers) - * are processed. - * - * @param timeout Timeout in seconds to wait for the processing. - */ - @VisibleForTesting - public void processAll(long timeout) { - long currentTime = Time.now(); - while (true) { - - if (!isRunning) { - LOG.warn("Processing of event skipped. EventQueue is not running"); - return; - } - - long processed = 0; - - Stream<EventExecutor> allExecutor = this.executors.values().stream() - .flatMap(handlerMap -> handlerMap.keySet().stream()); - - boolean allIdle = - allExecutor.allMatch(executor -> executor.queuedEvents() == executor - .successfulEvents() + executor.failedEvents()); - - if (allIdle) { - return; - } - - try { - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - if (Time.now() > currentTime + timeout) { - throw new AssertionError( - "Messages are not processed in the given timeframe. Queued: " - + queuedCount.get() + " Processed: " + processed); - } - } - } - - public void close() { - - isRunning = false; - - Set<EventExecutor> allExecutors = this.executors.values().stream() - .flatMap(handlerMap -> handlerMap.keySet().stream()) - .collect(Collectors.toSet()); - - allExecutors.forEach(executor -> { - try { - executor.close(); - } catch (Exception ex) { - LOG.error("Can't close the executor " + executor.getName(), ex); - } - }); - } - - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java deleted file mode 100644 index ba5078b..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.server.events; - -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.ozone.lease.Lease; -import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException; -import org.apache.hadoop.ozone.lease.LeaseExpiredException; -import org.apache.hadoop.ozone.lease.LeaseManager; -import org.apache.hadoop.ozone.lease.LeaseNotFoundException; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.commons.collections.map.HashedMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Event watcher the (re)send a message after timeout. - * <p> - * Event watcher will send the tracked payload/event after a timeout period - * unless a confirmation from the original event (completion event) is arrived. - * - * @param <TIMEOUT_PAYLOAD> The type of the events which are tracked. - * @param <COMPLETION_PAYLOAD> The type of event which could cancel the - * tracking. - */ -@SuppressWarnings("CheckStyle") -public abstract class EventWatcher<TIMEOUT_PAYLOAD extends - IdentifiableEventPayload, - COMPLETION_PAYLOAD extends IdentifiableEventPayload> { - - private static final Logger LOG = LoggerFactory.getLogger(EventWatcher.class); - - private final Event<TIMEOUT_PAYLOAD> startEvent; - - private final Event<COMPLETION_PAYLOAD> completionEvent; - - private final LeaseManager<Long> leaseManager; - - private final EventWatcherMetrics metrics; - - private final String name; - - protected final Map<Long, TIMEOUT_PAYLOAD> trackedEventsByID = - new ConcurrentHashMap<>(); - - protected final Set<TIMEOUT_PAYLOAD> trackedEvents = new HashSet<>(); - - private final Map<Long, Long> startTrackingTimes = new HashedMap(); - - public EventWatcher(String name, Event<TIMEOUT_PAYLOAD> startEvent, - Event<COMPLETION_PAYLOAD> completionEvent, - LeaseManager<Long> leaseManager) { - this.startEvent = startEvent; - this.completionEvent = completionEvent; - this.leaseManager = leaseManager; - this.metrics = new EventWatcherMetrics(); - Preconditions.checkNotNull(name); - if (name.equals("")) { - name = getClass().getSimpleName(); - } - if (name.equals("")) { - //for anonymous inner classes - name = getClass().getName(); - } - this.name = name; - } - - public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent, - Event<COMPLETION_PAYLOAD> completionEvent, - LeaseManager<Long> leaseManager) { - this("", startEvent, completionEvent, leaseManager); - } - - public void start(EventQueue queue) { - - queue.addHandler(startEvent, this::handleStartEvent); - - queue.addHandler(completionEvent, (completionPayload, publisher) -> { - try { - handleCompletion(completionPayload, publisher); - } catch (LeaseNotFoundException e) { - //It's already done. Too late, we already retried it. - //Not a real problem. - LOG.warn("Completion event without active lease. Id={}", - completionPayload.getId()); - } - }); - - MetricsSystem ms = DefaultMetricsSystem.instance(); - ms.register(name, "EventWatcher metrics", metrics); - } - - private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload, - EventPublisher publisher) { - metrics.incrementTrackedEvents(); - long identifier = payload.getId(); - startTrackingTimes.put(identifier, System.currentTimeMillis()); - - trackedEventsByID.put(identifier, payload); - trackedEvents.add(payload); - try { - Lease<Long> lease = leaseManager.acquire(identifier); - try { - lease.registerCallBack(() -> { - handleTimeout(publisher, identifier); - return null; - }); - - } catch (LeaseExpiredException e) { - handleTimeout(publisher, identifier); - } - } catch (LeaseAlreadyExistException e) { - //No problem at all. But timer is not reset. - } - } - - protected synchronized void handleCompletion(COMPLETION_PAYLOAD - completionPayload, EventPublisher publisher) throws - LeaseNotFoundException { - metrics.incrementCompletedEvents(); - long id = completionPayload.getId(); - leaseManager.release(id); - TIMEOUT_PAYLOAD payload = trackedEventsByID.remove(id); - trackedEvents.remove(payload); - long originalTime = startTrackingTimes.remove(id); - metrics.updateFinishingTime(System.currentTimeMillis() - originalTime); - onFinished(publisher, payload); - } - - private synchronized void handleTimeout(EventPublisher publisher, - long identifier) { - metrics.incrementTimedOutEvents(); - TIMEOUT_PAYLOAD payload = trackedEventsByID.remove(identifier); - trackedEvents.remove(payload); - startTrackingTimes.remove(payload.getId()); - onTimeout(publisher, payload); - } - - - /** - * Check if a specific payload is in-progress. - */ - public synchronized boolean contains(TIMEOUT_PAYLOAD payload) { - return trackedEvents.contains(payload); - } - - public synchronized boolean remove(TIMEOUT_PAYLOAD payload) { - try { - leaseManager.release(payload.getId()); - } catch (LeaseNotFoundException e) { - LOG.warn("Completion event without active lease. Id={}", - payload.getId()); - } - trackedEventsByID.remove(payload.getId()); - return trackedEvents.remove(payload); - - } - - protected abstract void onTimeout( - EventPublisher publisher, TIMEOUT_PAYLOAD payload); - - protected abstract void onFinished( - EventPublisher publisher, TIMEOUT_PAYLOAD payload); - - public List<TIMEOUT_PAYLOAD> getTimeoutEvents( - Predicate<? super TIMEOUT_PAYLOAD> predicate) { - return trackedEventsByID.values().stream().filter(predicate) - .collect(Collectors.toList()); - } - - @VisibleForTesting - protected EventWatcherMetrics getMetrics() { - return metrics; - } - - /** - * Returns a tracked event to which the specified id is - * mapped, or {@code null} if there is no mapping for the id. - */ - public TIMEOUT_PAYLOAD getTrackedEventbyId(long id) { - return trackedEventsByID.get(id); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java deleted file mode 100644 index 1db81a9..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.server.events; - -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; -import org.apache.hadoop.metrics2.lib.MutableRate; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Metrics for any event watcher. - */ -public class EventWatcherMetrics { - - @Metric() - private MutableCounterLong trackedEvents; - - @Metric() - private MutableCounterLong timedOutEvents; - - @Metric() - private MutableCounterLong completedEvents; - - @Metric() - private MutableRate completionTime; - - public void incrementTrackedEvents() { - trackedEvents.incr(); - } - - public void incrementTimedOutEvents() { - timedOutEvents.incr(); - } - - public void incrementCompletedEvents() { - completedEvents.incr(); - } - - @VisibleForTesting - public void updateFinishingTime(long duration) { - completionTime.add(duration); - } - - @VisibleForTesting - public MutableCounterLong getTrackedEvents() { - return trackedEvents; - } - - @VisibleForTesting - public MutableCounterLong getTimedOutEvents() { - return timedOutEvents; - } - - @VisibleForTesting - public MutableCounterLong getCompletedEvents() { - return completedEvents; - } - - @VisibleForTesting - public MutableRate getCompletionTime() { - return completionTime; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IdentifiableEventPayload.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IdentifiableEventPayload.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IdentifiableEventPayload.java deleted file mode 100644 index 3faa8e7..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IdentifiableEventPayload.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.server.events; - -/** - * Event with an additional unique identifier. - * - */ -public interface IdentifiableEventPayload { - - long getId(); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java deleted file mode 100644 index 3253f2d..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.server.events; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; - -/** - * Simple EventExecutor to call all the event handler one-by-one. - * - * @param <T> - */ -@Metrics(context = "EventQueue") -public class SingleThreadExecutor<T> implements EventExecutor<T> { - - public static final String THREAD_NAME_PREFIX = "EventQueue"; - - private static final Logger LOG = - LoggerFactory.getLogger(SingleThreadExecutor.class); - - private final String name; - - private final ThreadPoolExecutor executor; - - @Metric - private MutableCounterLong queued; - - @Metric - private MutableCounterLong done; - - @Metric - private MutableCounterLong failed; - - /** - * Create SingleThreadExecutor. - * - * @param name Unique name used in monitoring and metrics. - */ - public SingleThreadExecutor(String name) { - this.name = name; - DefaultMetricsSystem.instance() - .register("EventQueue" + name, "Event Executor metrics ", this); - - LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); - executor = - new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, - runnable -> { - Thread thread = new Thread(runnable); - thread.setName(THREAD_NAME_PREFIX + "-" + name); - return thread; - }); - - } - - @Override - public void onMessage(EventHandler<T> handler, T message, EventPublisher - publisher) { - queued.incr(); - executor.execute(() -> { - try { - handler.onMessage(message, publisher); - done.incr(); - } catch (Exception ex) { - LOG.error("Error on execution message {}", message, ex); - failed.incr(); - } - }); - } - - @Override - public long failedEvents() { - return failed.value(); - } - - @Override - public long successfulEvents() { - return done.value(); - } - - @Override - public long queuedEvents() { - return queued.value(); - } - - @Override - public void close() { - executor.shutdown(); - } - - @Override - public String getName() { - return name; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java deleted file mode 100644 index 27bba3a..0000000 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.server.events; - -/** - * Basic event implementation to implement custom events. - * - * @param <T> - */ -public class TypedEvent<T> implements Event<T> { - - private final Class<T> payloadType; - - private final String name; - - public TypedEvent(Class<T> payloadType, String name) { - this.payloadType = payloadType; - this.name = name; - } - - public TypedEvent(Class<T> payloadType) { - this.payloadType = payloadType; - this.name = payloadType.getSimpleName(); - } - - @Override - public Class<T> getPayloadType() { - return payloadType; - } - - @Override - public String getName() { - return name; - } - - @Override - public String toString() { - return "TypedEvent{" + - "payloadType=" + payloadType.getSimpleName() + - ", name='" + name + '\'' + - '}'; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
