http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java deleted file mode 100644 index 7686df3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.scm; - -/* - * This package contains StorageContainerManager classes. - */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java deleted file mode 100644 index 28e5267..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.scm.pipelines; - - -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; -import org.apache.hadoop.scm.container.common.helpers.PipelineChannel; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Manage Ozone pipelines. - */ -public abstract class PipelineManager { - private static final Logger LOG = - LoggerFactory.getLogger(PipelineManager.class); - private final List<PipelineChannel> activePipelineChannels; - private final AtomicInteger conduitsIndex; - - public PipelineManager() { - activePipelineChannels = new LinkedList<>(); - conduitsIndex = new AtomicInteger(0); - } - - /** - * This function is called by the Container Manager while allocating a new - * container. The client specifies what kind of replication pipeline is - * needed and based on the replication type in the request appropriate - * Interface is invoked. - * - * @param containerName Name of the container - * @param replicationFactor - Replication Factor - * @return a Pipeline. - */ - public synchronized final Pipeline getPipeline(String containerName, - ReplicationFactor replicationFactor, ReplicationType replicationType) - throws IOException { - /** - * In the Ozone world, we have a very simple policy. - * - * 1. Try to create a pipelineChannel if there are enough free nodes. - * - * 2. This allows all nodes to part of a pipelineChannel quickly. - * - * 3. if there are not enough free nodes, return conduits in a - * round-robin fashion. - * - * TODO: Might have to come up with a better algorithm than this. - * Create a new placement policy that returns conduits in round robin - * fashion. - */ - PipelineChannel pipelineChannel = - allocatePipelineChannel(replicationFactor); - if (pipelineChannel != null) { - LOG.debug("created new pipelineChannel:{} for container:{}", - pipelineChannel.getName(), containerName); - activePipelineChannels.add(pipelineChannel); - } else { - pipelineChannel = - findOpenPipelineChannel(replicationType, replicationFactor); - if (pipelineChannel != null) { - LOG.debug("re-used pipelineChannel:{} for container:{}", - pipelineChannel.getName(), containerName); - } - } - if (pipelineChannel == null) { - LOG.error("Get pipelineChannel call failed. We are not able to find" + - "free nodes or operational pipelineChannel."); - return null; - } else { - return new Pipeline(containerName, pipelineChannel); - } - } - - protected int getReplicationCount(ReplicationFactor factor) { - switch (factor) { - case ONE: - return 1; - case THREE: - return 3; - default: - throw new IllegalArgumentException("Unexpected replication count"); - } - } - - public abstract PipelineChannel allocatePipelineChannel( - ReplicationFactor replicationFactor) throws IOException; - - /** - * Find a PipelineChannel that is operational. - * - * @return - Pipeline or null - */ - private PipelineChannel findOpenPipelineChannel( - ReplicationType type, ReplicationFactor factor) { - PipelineChannel pipelineChannel = null; - final int sentinal = -1; - if (activePipelineChannels.size() == 0) { - LOG.error("No Operational conduits found. Returning null."); - return null; - } - int startIndex = getNextIndex(); - int nextIndex = sentinal; - for (; startIndex != nextIndex; nextIndex = getNextIndex()) { - // Just walk the list in a circular way. - PipelineChannel temp = - activePipelineChannels - .get(nextIndex != sentinal ? nextIndex : startIndex); - // if we find an operational pipelineChannel just return that. - if ((temp.getLifeCycleState() == LifeCycleState.OPEN) && - (temp.getFactor() == factor) && (temp.getType() == type)) { - pipelineChannel = temp; - break; - } - } - return pipelineChannel; - } - - /** - * gets the next index of the PipelineChannel to get. - * - * @return index in the link list to get. - */ - private int getNextIndex() { - return conduitsIndex.incrementAndGet() % activePipelineChannels.size(); - } - - /** - * Creates a pipeline from a specified set of Nodes. - * @param pipelineID - Name of the pipeline - * @param datanodes - The list of datanodes that make this pipeline. - */ - public abstract void createPipeline(String pipelineID, - List<DatanodeID> datanodes) throws IOException; - - /** - * Close the pipeline with the given clusterId. - */ - public abstract void closePipeline(String pipelineID) throws IOException; - - /** - * list members in the pipeline . - * @return the datanode - */ - public abstract List<DatanodeID> getMembers(String pipelineID) - throws IOException; - - /** - * Update the datanode list of the pipeline. - */ - public abstract void updatePipeline(String pipelineID, - List<DatanodeID> newDatanodes) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java deleted file mode 100644 index 7759f9c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.scm.pipelines; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; -import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; -import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom; -import org.apache.hadoop.ozone.scm.node.NodeManager; -import org.apache.hadoop.ozone.scm.pipelines.ratis.RatisManagerImpl; -import org.apache.hadoop.ozone.scm.pipelines.standalone.StandaloneManagerImpl; -import org.apache.hadoop.scm.ScmConfigKeys; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.container.common.helpers.PipelineChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.List; -import java.util.stream.Collectors; - -/** - * Sends the request to the right pipeline manager. - */ -public class PipelineSelector { - private static final Logger LOG = - LoggerFactory.getLogger(PipelineSelector.class); - private final ContainerPlacementPolicy placementPolicy; - private final NodeManager nodeManager; - private final Configuration conf; - private final RatisManagerImpl ratisManager; - private final StandaloneManagerImpl standaloneManager; - private final long containerSize; - - /** - * Constructs a pipeline Selector. - * - * @param nodeManager - node manager - * @param conf - Ozone Config - */ - public PipelineSelector(NodeManager nodeManager, Configuration conf) { - this.nodeManager = nodeManager; - this.conf = conf; - this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf); - this.containerSize = OzoneConsts.GB * this.conf.getInt( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); - this.standaloneManager = - new StandaloneManagerImpl(this.nodeManager, placementPolicy, - containerSize); - this.ratisManager = - new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize, - conf); - } - - /** - * Translates a list of nodes, ordered such that the first is the leader, into - * a corresponding {@link Pipeline} object. - * - * @param nodes - list of datanodes on which we will allocate the container. - * The first of the list will be the leader node. - * @return pipeline corresponding to nodes - */ - public static PipelineChannel newPipelineFromNodes(List<DatanodeID> nodes, - LifeCycleState state, ReplicationType replicationType, - ReplicationFactor replicationFactor, String name) { - Preconditions.checkNotNull(nodes); - Preconditions.checkArgument(nodes.size() > 0); - String leaderId = nodes.get(0).getDatanodeUuid(); - PipelineChannel - pipelineChannel = new PipelineChannel(leaderId, state, replicationType, - replicationFactor, name); - for (DatanodeID node : nodes) { - pipelineChannel.addMember(node); - } - return pipelineChannel; - } - - /** - * Create pluggable container placement policy implementation instance. - * - * @param nodeManager - SCM node manager. - * @param conf - configuration. - * @return SCM container placement policy implementation instance. - */ - @SuppressWarnings("unchecked") - private static ContainerPlacementPolicy createContainerPlacementPolicy( - final NodeManager nodeManager, final Configuration conf) { - Class<? extends ContainerPlacementPolicy> implClass = - (Class<? extends ContainerPlacementPolicy>) conf.getClass( - ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, - SCMContainerPlacementRandom.class); - - try { - Constructor<? extends ContainerPlacementPolicy> ctor = - implClass.getDeclaredConstructor(NodeManager.class, - Configuration.class); - return ctor.newInstance(nodeManager, conf); - } catch (RuntimeException e) { - throw e; - } catch (InvocationTargetException e) { - throw new RuntimeException(implClass.getName() - + " could not be constructed.", e.getCause()); - } catch (Exception e) { - LOG.error("Unhandled exception occurred, Placement policy will not be " + - "functional."); - throw new IllegalArgumentException("Unable to load " + - "ContainerPlacementPolicy", e); - } - } - - /** - * Return the pipeline manager from the replication type. - * - * @param replicationType - Replication Type Enum. - * @return pipeline Manager. - * @throws IllegalArgumentException If an pipeline type gets added - * and this function is not modified we will throw. - */ - private PipelineManager getPipelineManager(ReplicationType replicationType) - throws IllegalArgumentException { - switch (replicationType) { - case RATIS: - return this.ratisManager; - case STAND_ALONE: - return this.standaloneManager; - case CHAINED: - throw new IllegalArgumentException("Not implemented yet"); - default: - throw new IllegalArgumentException("Unexpected enum found. Does not" + - " know how to handle " + replicationType.toString()); - } - - } - - /** - * This function is called by the Container Manager while allocating a new - * container. The client specifies what kind of replication pipeline is needed - * and based on the replication type in the request appropriate Interface is - * invoked. - */ - - public Pipeline getReplicationPipeline(ReplicationType replicationType, - OzoneProtos.ReplicationFactor replicationFactor, String containerName) - throws IOException { - PipelineManager manager = getPipelineManager(replicationType); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Getting replication pipeline for {} : Replication {}", - containerName, replicationFactor.toString()); - return manager. - getPipeline(containerName, replicationFactor, replicationType); - } - - /** - * Creates a pipeline from a specified set of Nodes. - */ - - public void createPipeline(ReplicationType replicationType, String - pipelineID, List<DatanodeID> datanodes) throws IOException { - PipelineManager manager = getPipelineManager(replicationType); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID, - datanodes.stream().map(DatanodeID::toString) - .collect(Collectors.joining(","))); - manager.createPipeline(pipelineID, datanodes); - } - - /** - * Close the pipeline with the given clusterId. - */ - - public void closePipeline(ReplicationType replicationType, String - pipelineID) throws IOException { - PipelineManager manager = getPipelineManager(replicationType); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Closing pipeline. pipelineID: {}", pipelineID); - manager.closePipeline(pipelineID); - } - - /** - * list members in the pipeline . - */ - - public List<DatanodeID> getDatanodes(ReplicationType replicationType, - String pipelineID) throws IOException { - PipelineManager manager = getPipelineManager(replicationType); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Getting data nodes from pipeline : {}", pipelineID); - return manager.getMembers(pipelineID); - } - - /** - * Update the datanodes in the list of the pipeline. - */ - - public void updateDatanodes(ReplicationType replicationType, String - pipelineID, List<DatanodeID> newDatanodes) throws IOException { - PipelineManager manager = getPipelineManager(replicationType); - Preconditions.checkNotNull(manager, "Found invalid pipeline manager"); - LOG.debug("Updating pipeline: {} with new nodes:{}", pipelineID, - newDatanodes.stream().map(DatanodeID::toString) - .collect(Collectors.joining(","))); - manager.updatePipeline(pipelineID, newDatanodes); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/package-info.java deleted file mode 100644 index c2a3b54..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/package-info.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.scm.pipelines; -/** - Ozone supports the notion of different kind of pipelines. - That means that we can have a replication pipeline build on - Ratis, Standalone or some other protocol. All Pipeline managers - the entities in charge of pipelines reside in the package. - - Here is the high level Arch. - - 1. A pipeline selector class is instantiated in the Container manager class. - - 2. A client when creating a container -- will specify what kind of - replication type it wants to use. We support 2 types now, Ratis and StandAlone. - - 3. Based on the replication type, the pipeline selector class asks the - corresponding pipeline manager for a pipeline. - - 4. We have supported the ability for clients to specify a set of nodes in - the pipeline or rely in the pipeline manager to select the datanodes if they - are not specified. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java deleted file mode 100644 index 7903b0b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.scm.pipelines.ratis; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; -import org.apache.hadoop.ozone.scm.container.placement.algorithms - .ContainerPlacementPolicy; -import org.apache.hadoop.ozone.scm.node.NodeManager; -import org.apache.hadoop.ozone.scm.pipelines.PipelineManager; -import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector; -import org.apache.hadoop.scm.XceiverClientRatis; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.container.common.helpers.PipelineChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -/** - * Implementation of {@link PipelineManager}. - * - * TODO : Introduce a state machine. - */ -public class RatisManagerImpl extends PipelineManager { - private static final Logger LOG = - LoggerFactory.getLogger(RatisManagerImpl.class); - private static final String PREFIX = "Ratis-"; - private final Configuration conf; - private final NodeManager nodeManager; - private final Set<DatanodeID> ratisMembers; - - /** - * Constructs a Ratis Pipeline Manager. - * - * @param nodeManager - */ - public RatisManagerImpl(NodeManager nodeManager, - ContainerPlacementPolicy placementPolicy, long size, Configuration conf) { - super(); - this.conf = conf; - this.nodeManager = nodeManager; - ratisMembers = new HashSet<>(); - } - - /** - * Allocates a new ratis PipelineChannel from the free nodes. - * - * @param factor - One or Three - * @return PipelineChannel. - */ - public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { - List<DatanodeID> newNodesList = new LinkedList<>(); - List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY); - int count = getReplicationCount(factor); - //TODO: Add Raft State to the Nodes, so we can query and skip nodes from - // data from datanode instead of maintaining a set. - for (DatanodeID datanode : datanodes) { - Preconditions.checkNotNull(datanode); - if (!ratisMembers.contains(datanode)) { - newNodesList.add(datanode); - if (newNodesList.size() == count) { - // once a datanode has been added to a pipeline, exclude it from - // further allocations - ratisMembers.addAll(newNodesList); - LOG.info("Allocating a new ratis pipelineChannel of size: {}", count); - // Start all channel names with "Ratis", easy to grep the logs. - String conduitName = PREFIX + - UUID.randomUUID().toString().substring(PREFIX.length()); - PipelineChannel pipelineChannel = - PipelineSelector.newPipelineFromNodes(newNodesList, - LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName); - Pipeline pipeline = - new Pipeline("setup", pipelineChannel); - try (XceiverClientRatis client = - XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { - client.createPipeline(pipeline.getPipelineName(), newNodesList); - } catch (IOException e) { - return null; - } - return pipelineChannel; - } - } - } - return null; - } - - /** - * Creates a pipeline from a specified set of Nodes. - * - * @param pipelineID - Name of the pipeline - * @param datanodes - The list of datanodes that make this pipeline. - */ - @Override - public void createPipeline(String pipelineID, List<DatanodeID> datanodes) { - - } - - /** - * Close the pipeline with the given clusterId. - * - * @param pipelineID - */ - @Override - public void closePipeline(String pipelineID) throws IOException { - - } - - /** - * list members in the pipeline . - * - * @param pipelineID - * @return the datanode - */ - @Override - public List<DatanodeID> getMembers(String pipelineID) throws IOException { - return null; - } - - /** - * Update the datanode list of the pipeline. - * - * @param pipelineID - * @param newDatanodes - */ - @Override - public void updatePipeline(String pipelineID, List<DatanodeID> newDatanodes) - throws IOException { - - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/package-info.java deleted file mode 100644 index 6fe9b28..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.scm.pipelines.ratis; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java deleted file mode 100644 index ef37926..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.scm.pipelines.standalone; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; -import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; -import org.apache.hadoop.ozone.scm.node.NodeManager; -import org.apache.hadoop.ozone.scm.pipelines.PipelineManager; -import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector; -import org.apache.hadoop.scm.container.common.helpers.PipelineChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.UUID; -import java.util.Set; -import java.util.HashSet; -import java.util.LinkedList; - -/** - * Standalone Manager Impl to prove that pluggable interface - * works with current tests. - */ -public class StandaloneManagerImpl extends PipelineManager { - private static final Logger LOG = - LoggerFactory.getLogger(StandaloneManagerImpl.class); - private final NodeManager nodeManager; - private final ContainerPlacementPolicy placementPolicy; - private final long containerSize; - private final Set<DatanodeID> standAloneMembers; - - /** - * Constructor for Standalone Node Manager Impl. - * @param nodeManager - Node Manager. - * @param placementPolicy - Placement Policy - * @param containerSize - Container Size. - */ - public StandaloneManagerImpl(NodeManager nodeManager, - ContainerPlacementPolicy placementPolicy, long containerSize) { - super(); - this.nodeManager = nodeManager; - this.placementPolicy = placementPolicy; - this.containerSize = containerSize; - this.standAloneMembers = new HashSet<>(); - } - - - /** - * Allocates a new standalone PipelineChannel from the free nodes. - * - * @param factor - One - * @return PipelineChannel. - */ - public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { - List<DatanodeID> newNodesList = new LinkedList<>(); - List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY); - int count = getReplicationCount(factor); - for (DatanodeID datanode : datanodes) { - Preconditions.checkNotNull(datanode); - if (!standAloneMembers.contains(datanode)) { - newNodesList.add(datanode); - if (newNodesList.size() == count) { - // once a datanode has been added to a pipeline, exclude it from - // further allocations - standAloneMembers.addAll(newNodesList); - LOG.info("Allocating a new standalone pipeline channel of size: {}", - count); - String channelName = - "SA-" + UUID.randomUUID().toString().substring(3); - return PipelineSelector.newPipelineFromNodes(newNodesList, - LifeCycleState.OPEN, ReplicationType.STAND_ALONE, - ReplicationFactor.ONE, channelName); - } - } - } - return null; - } - - /** - * Creates a pipeline from a specified set of Nodes. - * - * @param pipelineID - Name of the pipeline - * @param datanodes - The list of datanodes that make this pipeline. - */ - @Override - public void createPipeline(String pipelineID, List<DatanodeID> datanodes) { - //return newPipelineFromNodes(datanodes, pipelineID); - } - - /** - * Close the pipeline with the given clusterId. - * - * @param pipelineID - */ - @Override - public void closePipeline(String pipelineID) throws IOException { - - } - - /** - * list members in the pipeline . - * - * @param pipelineID - * @return the datanode - */ - @Override - public List<DatanodeID> getMembers(String pipelineID) throws IOException { - return null; - } - - /** - * Update the datanode list of the pipeline. - * - * @param pipelineID - * @param newDatanodes - */ - @Override - public void updatePipeline(String pipelineID, List<DatanodeID> - newDatanodes) throws IOException { - - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/package-info.java deleted file mode 100644 index 7e6393a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.scm.pipelines.standalone; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/package-info.java deleted file mode 100644 index 27fd32b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/ratis/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.scm.ratis; - -/** - * This package contains classes related to Apache Ratis for SCM. - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Freon.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Freon.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Freon.java deleted file mode 100644 index bec5d87..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Freon.java +++ /dev/null @@ -1,1146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.tools; - -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Snapshot; -import com.codahale.metrics.UniformReservoir; -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.RandomStringUtils; -import org.apache.commons.lang.time.DurationFormatUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.client.*; -import org.apache.hadoop.ozone.client.io.OzoneInputStream; -import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.util.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; - -import static java.lang.Math.min; - -/** - * Freon - A tool to populate ozone with data for testing.<br> - * This is not a map-reduce program and this is not for benchmarking - * Ozone write throughput.<br> - * It supports both online and offline modes. Default mode is offline, - * <i>-mode</i> can be used to change the mode. - * <p> - * In online mode, active internet connection is required, - * common crawl data from AWS will be used.<br> - * Default source is:<br> - * https://commoncrawl.s3.amazonaws.com/crawl-data/ - * CC-MAIN-2017-17/warc.paths.gz<br> - * (it contains the path to actual data segment)<br> - * User can override this using <i>-source</i>. - * The following values are derived from URL of Common Crawl data - * <ul> - * <li>Domain will be used as Volume</li> - * <li>URL will be used as Bucket</li> - * <li>FileName will be used as Key</li> - * </ul></p> - * In offline mode, the data will be random bytes and - * size of data will be 10 KB.<br> - * <ul> - * <li>Default number of Volumes 10, <i>-numOfVolumes</i> - * can be used to override</li> - * <li>Default number of Buckets per Volume 1000, <i>-numOfBuckets</i> - * can be used to override</li> - * <li>Default number of Keys per Bucket 500000, <i>-numOfKeys</i> - * can be used to override</li> - * </ul> - */ -public final class Freon extends Configured implements Tool { - - enum FreonOps { - VOLUME_CREATE, - BUCKET_CREATE, - KEY_CREATE, - KEY_WRITE - } - - private static final String HELP = "help"; - private static final String MODE = "mode"; - private static final String SOURCE = "source"; - private static final String VALIDATE_WRITE = "validateWrites"; - private static final String JSON_WRITE_DIRECTORY = "jsonDir"; - private static final String NUM_OF_THREADS = "numOfThreads"; - private static final String NUM_OF_VOLUMES = "numOfVolumes"; - private static final String NUM_OF_BUCKETS = "numOfBuckets"; - private static final String NUM_OF_KEYS = "numOfKeys"; - private static final String KEY_SIZE = "keySize"; - private static final String RATIS = "ratis"; - - private static final String MODE_DEFAULT = "offline"; - private static final String SOURCE_DEFAULT = - "https://commoncrawl.s3.amazonaws.com/" + - "crawl-data/CC-MAIN-2017-17/warc.paths.gz"; - private static final String NUM_OF_THREADS_DEFAULT = "10"; - private static final String NUM_OF_VOLUMES_DEFAULT = "10"; - private static final String NUM_OF_BUCKETS_DEFAULT = "1000"; - private static final String NUM_OF_KEYS_DEFAULT = "500000"; - private static final String DURATION_FORMAT = "HH:mm:ss,SSS"; - - private static final int KEY_SIZE_DEFAULT = 10240; - private static final int QUANTILES = 10; - - private static final Logger LOG = - LoggerFactory.getLogger(Freon.class); - - private boolean printUsage = false; - private boolean completed = false; - private boolean exception = false; - - private String mode; - private String source; - private String numOfThreads; - private String numOfVolumes; - private String numOfBuckets; - private String numOfKeys; - private String jsonDir; - private boolean useRatis; - private ReplicationType type; - private ReplicationFactor factor; - - private int threadPoolSize; - private int keySize; - private byte[] keyValue = null; - - private boolean validateWrites; - - private OzoneClient ozoneClient; - private ObjectStore objectStore; - private ExecutorService processor; - - private long startTime; - private long jobStartTime; - - private AtomicLong volumeCreationTime; - private AtomicLong bucketCreationTime; - private AtomicLong keyCreationTime; - private AtomicLong keyWriteTime; - - private AtomicLong totalBytesWritten; - - private AtomicInteger numberOfVolumesCreated; - private AtomicInteger numberOfBucketsCreated; - private AtomicLong numberOfKeysAdded; - - private Long totalWritesValidated; - private Long writeValidationSuccessCount; - private Long writeValidationFailureCount; - - private BlockingQueue<KeyValue> validationQueue; - private ArrayList<Histogram> histograms = new ArrayList<>(); - - @VisibleForTesting - Freon(Configuration conf) throws IOException { - startTime = System.nanoTime(); - jobStartTime = System.currentTimeMillis(); - volumeCreationTime = new AtomicLong(); - bucketCreationTime = new AtomicLong(); - keyCreationTime = new AtomicLong(); - keyWriteTime = new AtomicLong(); - totalBytesWritten = new AtomicLong(); - numberOfVolumesCreated = new AtomicInteger(); - numberOfBucketsCreated = new AtomicInteger(); - numberOfKeysAdded = new AtomicLong(); - ozoneClient = OzoneClientFactory.getClient(conf); - objectStore = ozoneClient.getObjectStore(); - for (FreonOps ops : FreonOps.values()) { - histograms.add(ops.ordinal(), new Histogram(new UniformReservoir())); - } - } - - /** - * @param args arguments - */ - public static void main(String[] args) throws Exception { - Configuration conf = new OzoneConfiguration(); - int res = ToolRunner.run(conf, new Freon(conf), args); - System.exit(res); - } - - @Override - public int run(String[] args) throws Exception { - GenericOptionsParser parser = new GenericOptionsParser(getConf(), - getOptions(), args); - parseOptions(parser.getCommandLine()); - if (printUsage) { - usage(); - return 0; - } - - keyValue = - DFSUtil.string2Bytes(RandomStringUtils.randomAscii(keySize - 36)); - - LOG.info("Number of Threads: " + numOfThreads); - threadPoolSize = - min(Integer.parseInt(numOfVolumes), Integer.parseInt(numOfThreads)); - processor = Executors.newFixedThreadPool(threadPoolSize); - addShutdownHook(); - if (mode.equals("online")) { - LOG.info("Mode: online"); - throw new UnsupportedOperationException("Not yet implemented."); - } else { - LOG.info("Mode: offline"); - LOG.info("Number of Volumes: {}.", numOfVolumes); - LOG.info("Number of Buckets per Volume: {}.", numOfBuckets); - LOG.info("Number of Keys per Bucket: {}.", numOfKeys); - LOG.info("Key size: {} bytes", keySize); - for (int i = 0; i < Integer.parseInt(numOfVolumes); i++) { - String volume = "vol-" + i + "-" + - RandomStringUtils.randomNumeric(5); - processor.submit(new OfflineProcessor(volume)); - } - } - Thread validator = null; - if (validateWrites) { - totalWritesValidated = 0L; - writeValidationSuccessCount = 0L; - writeValidationFailureCount = 0L; - - validationQueue = - new ArrayBlockingQueue<>(Integer.parseInt(numOfThreads)); - validator = new Thread(new Validator()); - validator.start(); - LOG.info("Data validation is enabled."); - } - Thread progressbar = getProgressBarThread(); - LOG.info("Starting progress bar Thread."); - progressbar.start(); - processor.shutdown(); - processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); - completed = true; - progressbar.join(); - if (validateWrites) { - validator.join(); - } - ozoneClient.close(); - return 0; - } - - private Options getOptions() { - Options options = new Options(); - - OptionBuilder.withDescription("prints usage."); - Option optHelp = OptionBuilder.create(HELP); - - OptionBuilder.withArgName("online | offline"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies the mode of " + - "Freon run."); - Option optMode = OptionBuilder.create(MODE); - - OptionBuilder.withArgName("source url"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies the URL of s3 " + - "commoncrawl warc file to be used when the mode is online."); - Option optSource = OptionBuilder.create(SOURCE); - - OptionBuilder.withDescription("do random validation of " + - "data written into ozone, only subset of data is validated."); - Option optValidateWrite = OptionBuilder.create(VALIDATE_WRITE); - - - OptionBuilder.withDescription("directory where json is created"); - OptionBuilder.hasArg(); - Option optJsonDir = OptionBuilder.create(JSON_WRITE_DIRECTORY); - - OptionBuilder.withArgName("value"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("number of threads to be launched " + - "for the run"); - Option optNumOfThreads = OptionBuilder.create(NUM_OF_THREADS); - - OptionBuilder.withArgName("value"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies number of Volumes to be " + - "created in offline mode"); - Option optNumOfVolumes = OptionBuilder.create(NUM_OF_VOLUMES); - - OptionBuilder.withArgName("value"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies number of Buckets to be " + - "created per Volume in offline mode"); - Option optNumOfBuckets = OptionBuilder.create(NUM_OF_BUCKETS); - - OptionBuilder.withArgName("value"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies number of Keys to be " + - "created per Bucket in offline mode"); - Option optNumOfKeys = OptionBuilder.create(NUM_OF_KEYS); - - OptionBuilder.withArgName("value"); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("specifies the size of Key in bytes to be " + - "created in offline mode"); - Option optKeySize = OptionBuilder.create(KEY_SIZE); - - OptionBuilder.withArgName(RATIS); - OptionBuilder.hasArg(); - OptionBuilder.withDescription("Use Ratis as the default replication " + - "strategy"); - Option optRatis = OptionBuilder.create(RATIS); - - options.addOption(optHelp); - options.addOption(optMode); - options.addOption(optSource); - options.addOption(optValidateWrite); - options.addOption(optJsonDir); - options.addOption(optNumOfThreads); - options.addOption(optNumOfVolumes); - options.addOption(optNumOfBuckets); - options.addOption(optNumOfKeys); - options.addOption(optKeySize); - options.addOption(optRatis); - return options; - } - - private void parseOptions(CommandLine cmdLine) { - printUsage = cmdLine.hasOption(HELP); - - mode = cmdLine.getOptionValue(MODE, MODE_DEFAULT); - - source = cmdLine.getOptionValue(SOURCE, SOURCE_DEFAULT); - - numOfThreads = - cmdLine.getOptionValue(NUM_OF_THREADS, NUM_OF_THREADS_DEFAULT); - - validateWrites = cmdLine.hasOption(VALIDATE_WRITE); - - jsonDir = cmdLine.getOptionValue(JSON_WRITE_DIRECTORY); - - numOfVolumes = - cmdLine.getOptionValue(NUM_OF_VOLUMES, NUM_OF_VOLUMES_DEFAULT); - - numOfBuckets = - cmdLine.getOptionValue(NUM_OF_BUCKETS, NUM_OF_BUCKETS_DEFAULT); - - numOfKeys = cmdLine.getOptionValue(NUM_OF_KEYS, NUM_OF_KEYS_DEFAULT); - - keySize = cmdLine.hasOption(KEY_SIZE) ? - Integer.parseInt(cmdLine.getOptionValue(KEY_SIZE)) : KEY_SIZE_DEFAULT; - if (keySize < 1024) { - throw new IllegalArgumentException( - "keySize can not be less than 1024 bytes"); - } - - useRatis = cmdLine.hasOption(RATIS); - - type = ReplicationType.STAND_ALONE; - factor = ReplicationFactor.ONE; - - if (useRatis) { - type = ReplicationType.RATIS; - int replicationFactor = Integer.parseInt(cmdLine.getOptionValue(RATIS)); - switch (replicationFactor) { - case 1: - factor = ReplicationFactor.ONE; - break; - case 3: - factor = ReplicationFactor.THREE; - break; - default: - throw new IllegalArgumentException("Illegal replication factor:" - + replicationFactor); - } - } - } - - private void usage() { - System.out.println("Options supported are:"); - System.out.println("-numOfThreads <value> " - + "number of threads to be launched for the run."); - System.out.println("-validateWrites " - + "do random validation of data written into ozone, " + - "only subset of data is validated."); - System.out.println("-jsonDir " - + "directory where json is created."); - System.out.println("-mode [online | offline] " - + "specifies the mode in which Freon should run."); - System.out.println("-source <url> " - + "specifies the URL of s3 commoncrawl warc file to " + - "be used when the mode is online."); - System.out.println("-numOfVolumes <value> " - + "specifies number of Volumes to be created in offline mode"); - System.out.println("-numOfBuckets <value> " - + "specifies number of Buckets to be created per Volume " + - "in offline mode"); - System.out.println("-numOfKeys <value> " - + "specifies number of Keys to be created per Bucket " + - "in offline mode"); - System.out.println("-keySize <value> " - + "specifies the size of Key in bytes to be created in offline mode"); - System.out.println("-help " - + "prints usage."); - System.out.println(); - } - - /** - * Adds ShutdownHook to print statistics. - */ - private void addShutdownHook() { - Runtime.getRuntime().addShutdownHook( - new Thread(() -> printStats(System.out))); - } - - private Thread getProgressBarThread() { - Supplier<Long> currentValue; - long maxValue; - - if (mode.equals("online")) { - throw new UnsupportedOperationException("Not yet implemented."); - } else { - currentValue = () -> numberOfKeysAdded.get(); - maxValue = Long.parseLong(numOfVolumes) * - Long.parseLong(numOfBuckets) * - Long.parseLong(numOfKeys); - } - Thread progressBarThread = new Thread( - new ProgressBar(System.out, currentValue, maxValue)); - progressBarThread.setName("ProgressBar"); - return progressBarThread; - } - - /** - * Prints stats of {@link Freon} run to the PrintStream. - * - * @param out PrintStream - */ - private void printStats(PrintStream out) { - long endTime = System.nanoTime() - startTime; - String execTime = DurationFormatUtils - .formatDuration(TimeUnit.NANOSECONDS.toMillis(endTime), - DURATION_FORMAT); - - long volumeTime = TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get()) - / threadPoolSize; - String prettyAverageVolumeTime = - DurationFormatUtils.formatDuration(volumeTime, DURATION_FORMAT); - - long bucketTime = TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get()) - / threadPoolSize; - String prettyAverageBucketTime = - DurationFormatUtils.formatDuration(bucketTime, DURATION_FORMAT); - - long averageKeyCreationTime = - TimeUnit.NANOSECONDS.toMillis(keyCreationTime.get()) - / threadPoolSize; - String prettyAverageKeyCreationTime = DurationFormatUtils - .formatDuration(averageKeyCreationTime, DURATION_FORMAT); - - long averageKeyWriteTime = - TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()) / threadPoolSize; - String prettyAverageKeyWriteTime = DurationFormatUtils - .formatDuration(averageKeyWriteTime, DURATION_FORMAT); - - out.println(); - out.println("***************************************************"); - out.println("Status: " + (exception ? "Failed" : "Success")); - out.println("Git Base Revision: " + VersionInfo.getRevision()); - out.println("Number of Volumes created: " + numberOfVolumesCreated); - out.println("Number of Buckets created: " + numberOfBucketsCreated); - out.println("Number of Keys added: " + numberOfKeysAdded); - out.println("Ratis replication factor: " + factor.name()); - out.println("Ratis replication type: " + type.name()); - out.println( - "Average Time spent in volume creation: " + prettyAverageVolumeTime); - out.println( - "Average Time spent in bucket creation: " + prettyAverageBucketTime); - out.println( - "Average Time spent in key creation: " + prettyAverageKeyCreationTime); - out.println( - "Average Time spent in key write: " + prettyAverageKeyWriteTime); - out.println("Total bytes written: " + totalBytesWritten); - if (validateWrites) { - out.println("Total number of writes validated: " + - totalWritesValidated); - out.println("Writes validated: " + - (100.0 * totalWritesValidated / numberOfKeysAdded.get()) - + " %"); - out.println("Successful validation: " + - writeValidationSuccessCount); - out.println("Unsuccessful validation: " + - writeValidationFailureCount); - } - out.println("Total Execution time: " + execTime); - out.println("***************************************************"); - - if (jsonDir != null) { - - String[][] quantileTime = - new String[FreonOps.values().length][QUANTILES + 1]; - String[] deviations = new String[FreonOps.values().length]; - String[] means = new String[FreonOps.values().length]; - for (FreonOps ops : FreonOps.values()) { - Snapshot snapshot = histograms.get(ops.ordinal()).getSnapshot(); - for (int i = 0; i <= QUANTILES; i++) { - quantileTime[ops.ordinal()][i] = DurationFormatUtils.formatDuration( - TimeUnit.NANOSECONDS - .toMillis((long) snapshot.getValue((1.0 / QUANTILES) * i)), - DURATION_FORMAT); - } - deviations[ops.ordinal()] = DurationFormatUtils.formatDuration( - TimeUnit.NANOSECONDS.toMillis((long) snapshot.getStdDev()), - DURATION_FORMAT); - means[ops.ordinal()] = DurationFormatUtils.formatDuration( - TimeUnit.NANOSECONDS.toMillis((long) snapshot.getMean()), - DURATION_FORMAT); - } - - FreonJobInfo jobInfo = new FreonJobInfo().setExecTime(execTime) - .setGitBaseRevision(VersionInfo.getRevision()) - .setMeanVolumeCreateTime(means[FreonOps.VOLUME_CREATE.ordinal()]) - .setDeviationVolumeCreateTime( - deviations[FreonOps.VOLUME_CREATE.ordinal()]) - .setTenQuantileVolumeCreateTime( - quantileTime[FreonOps.VOLUME_CREATE.ordinal()]) - .setMeanBucketCreateTime(means[FreonOps.BUCKET_CREATE.ordinal()]) - .setDeviationBucketCreateTime( - deviations[FreonOps.BUCKET_CREATE.ordinal()]) - .setTenQuantileBucketCreateTime( - quantileTime[FreonOps.BUCKET_CREATE.ordinal()]) - .setMeanKeyCreateTime(means[FreonOps.KEY_CREATE.ordinal()]) - .setDeviationKeyCreateTime(deviations[FreonOps.KEY_CREATE.ordinal()]) - .setTenQuantileKeyCreateTime( - quantileTime[FreonOps.KEY_CREATE.ordinal()]) - .setMeanKeyWriteTime(means[FreonOps.KEY_WRITE.ordinal()]) - .setDeviationKeyWriteTime(deviations[FreonOps.KEY_WRITE.ordinal()]) - .setTenQuantileKeyWriteTime( - quantileTime[FreonOps.KEY_WRITE.ordinal()]); - String jsonName = - new SimpleDateFormat("yyyyMMddHHmmss").format(Time.now()) + ".json"; - String jsonPath = jsonDir + "/" + jsonName; - FileOutputStream os = null; - try { - os = new FileOutputStream(jsonPath); - ObjectMapper mapper = new ObjectMapper(); - mapper.setVisibility(PropertyAccessor.FIELD, - JsonAutoDetect.Visibility.ANY); - ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter(); - writer.writeValue(os, jobInfo); - } catch (FileNotFoundException e) { - out.println("Json File could not be created for the path: " + jsonPath); - out.println(e); - } catch (IOException e) { - out.println("Json object could not be created"); - out.println(e); - } finally { - try { - if (os != null) { - os.close(); - } - } catch (IOException e) { - LOG.warn("Could not close the output stream for json", e); - } - } - } - } - - /** - * Returns the number of volumes created. - * @return volume count. - */ - @VisibleForTesting - int getNumberOfVolumesCreated() { - return numberOfVolumesCreated.get(); - } - - /** - * Returns the number of buckets created. - * @return bucket count. - */ - @VisibleForTesting - int getNumberOfBucketsCreated() { - return numberOfBucketsCreated.get(); - } - - /** - * Returns the number of keys added. - * @return keys count. - */ - @VisibleForTesting - long getNumberOfKeysAdded() { - return numberOfKeysAdded.get(); - } - - /** - * Returns true if random validation of write is enabled. - * @return validateWrites - */ - @VisibleForTesting - boolean getValidateWrites() { - return validateWrites; - } - - /** - * Returns the number of keys validated. - * @return validated key count. - */ - @VisibleForTesting - long getTotalKeysValidated() { - return totalWritesValidated; - } - - /** - * Returns the number of successful validation. - * @return successful validation count. - */ - @VisibleForTesting - long getSuccessfulValidationCount() { - return writeValidationSuccessCount; - } - - /** - * Returns the number of unsuccessful validation. - * @return unsuccessful validation count. - */ - @VisibleForTesting - long getUnsuccessfulValidationCount() { - return writeValidationFailureCount; - } - - /** - * Returns the length of the common key value initialized. - * @return key value length initialized. - */ - @VisibleForTesting - long getKeyValueLength(){ - return keyValue.length; - } - - /** - * Wrapper to hold ozone key-value pair. - */ - private static class KeyValue { - - /** - * Bucket name associated with the key-value. - */ - private OzoneBucket bucket; - /** - * Key name associated with the key-value. - */ - private String key; - /** - * Value associated with the key-value. - */ - private byte[] value; - - /** - * Constructs a new ozone key-value pair. - * - * @param key key part - * @param value value part - */ - KeyValue(OzoneBucket bucket, String key, byte[] value) { - this.bucket = bucket; - this.key = key; - this.value = value; - } - } - - private class OfflineProcessor implements Runnable { - - private int totalBuckets; - private int totalKeys; - private String volumeName; - - OfflineProcessor(String volumeName) { - this.totalBuckets = Integer.parseInt(numOfBuckets); - this.totalKeys = Integer.parseInt(numOfKeys); - this.volumeName = volumeName; - } - - @Override - public void run() { - LOG.trace("Creating volume: {}", volumeName); - long start = System.nanoTime(); - OzoneVolume volume; - try { - objectStore.createVolume(volumeName); - long volumeCreationDuration = System.nanoTime() - start; - volumeCreationTime.getAndAdd(volumeCreationDuration); - histograms.get(FreonOps.VOLUME_CREATE.ordinal()) - .update(volumeCreationDuration); - numberOfVolumesCreated.getAndIncrement(); - volume = objectStore.getVolume(volumeName); - } catch (IOException e) { - exception = true; - LOG.error("Could not create volume", e); - return; - } - - Long threadKeyWriteTime = 0L; - for (int j = 0; j < totalBuckets; j++) { - String bucketName = "bucket-" + j + "-" + - RandomStringUtils.randomNumeric(5); - try { - LOG.trace("Creating bucket: {} in volume: {}", - bucketName, volume.getName()); - start = System.nanoTime(); - volume.createBucket(bucketName); - long bucketCreationDuration = System.nanoTime() - start; - histograms.get(FreonOps.BUCKET_CREATE.ordinal()) - .update(bucketCreationDuration); - bucketCreationTime.getAndAdd(bucketCreationDuration); - numberOfBucketsCreated.getAndIncrement(); - OzoneBucket bucket = volume.getBucket(bucketName); - for (int k = 0; k < totalKeys; k++) { - String key = "key-" + k + "-" + - RandomStringUtils.randomNumeric(5); - byte[] randomValue = - DFSUtil.string2Bytes(UUID.randomUUID().toString()); - try { - LOG.trace("Adding key: {} in bucket: {} of volume: {}", - key, bucket, volume); - long keyCreateStart = System.nanoTime(); - OzoneOutputStream os = - bucket.createKey(key, keySize, type, factor); - long keyCreationDuration = System.nanoTime() - keyCreateStart; - histograms.get(FreonOps.KEY_CREATE.ordinal()) - .update(keyCreationDuration); - keyCreationTime.getAndAdd(keyCreationDuration); - long keyWriteStart = System.nanoTime(); - os.write(keyValue); - os.write(randomValue); - os.close(); - long keyWriteDuration = System.nanoTime() - keyWriteStart; - threadKeyWriteTime += keyWriteDuration; - histograms.get(FreonOps.KEY_WRITE.ordinal()) - .update(keyWriteDuration); - totalBytesWritten.getAndAdd(keySize); - numberOfKeysAdded.getAndIncrement(); - if (validateWrites) { - byte[] value = ArrayUtils.addAll(keyValue, randomValue); - boolean validate = validationQueue.offer( - new KeyValue(bucket, key, value)); - if (validate) { - LOG.trace("Key {}, is queued for validation.", key); - } - } - } catch (Exception e) { - exception = true; - LOG.error("Exception while adding key: {} in bucket: {}" + - " of volume: {}.", key, bucket, volume, e); - } - } - } catch (Exception e) { - exception = true; - LOG.error("Exception while creating bucket: {}" + - " in volume: {}.", bucketName, volume, e); - } - } - - keyWriteTime.getAndAdd(threadKeyWriteTime); - } - - } - - private final class FreonJobInfo { - - private String status; - private String gitBaseRevision; - private String jobStartTime; - private String numOfVolumes; - private String numOfBuckets; - private String numOfKeys; - private String numOfThreads; - private String mode; - private String dataWritten; - private String execTime; - private String replicationFactor; - private String replicationType; - - private int keySize; - - private String totalThroughputPerSecond; - - private String meanVolumeCreateTime; - private String deviationVolumeCreateTime; - private String[] tenQuantileVolumeCreateTime; - - private String meanBucketCreateTime; - private String deviationBucketCreateTime; - private String[] tenQuantileBucketCreateTime; - - private String meanKeyCreateTime; - private String deviationKeyCreateTime; - private String[] tenQuantileKeyCreateTime; - - private String meanKeyWriteTime; - private String deviationKeyWriteTime; - private String[] tenQuantileKeyWriteTime; - - private FreonJobInfo() { - this.status = exception ? "Failed" : "Success"; - this.numOfVolumes = Freon.this.numOfVolumes; - this.numOfBuckets = Freon.this.numOfBuckets; - this.numOfKeys = Freon.this.numOfKeys; - this.numOfThreads = Freon.this.numOfThreads; - this.keySize = Freon.this.keySize; - this.mode = Freon.this.mode; - this.jobStartTime = Time.formatTime(Freon.this.jobStartTime); - this.replicationFactor = Freon.this.factor.name(); - this.replicationType = Freon.this.type.name(); - - long totalBytes = - Long.parseLong(numOfVolumes) * Long.parseLong(numOfBuckets) * Long - .parseLong(numOfKeys) * keySize; - this.dataWritten = getInStorageUnits((double) totalBytes); - this.totalThroughputPerSecond = getInStorageUnits( - (totalBytes * 1.0) / TimeUnit.NANOSECONDS - .toSeconds(Freon.this.keyWriteTime.get() / threadPoolSize)); - } - - private String getInStorageUnits(Double value) { - double size; - OzoneQuota.Units unit; - if ((long) (value / OzoneConsts.TB) != 0) { - size = value / OzoneConsts.TB; - unit = OzoneQuota.Units.TB; - } else if ((long) (value / OzoneConsts.GB) != 0) { - size = value / OzoneConsts.GB; - unit = OzoneQuota.Units.GB; - } else if ((long) (value / OzoneConsts.MB) != 0) { - size = value / OzoneConsts.MB; - unit = OzoneQuota.Units.MB; - } else if ((long) (value / OzoneConsts.KB) != 0) { - size = value / OzoneConsts.KB; - unit = OzoneQuota.Units.KB; - } else { - size = value; - unit = OzoneQuota.Units.BYTES; - } - return size + " " + unit; - } - - public FreonJobInfo setGitBaseRevision(String gitBaseRevisionVal) { - gitBaseRevision = gitBaseRevisionVal; - return this; - } - - public FreonJobInfo setExecTime(String execTimeVal) { - execTime = execTimeVal; - return this; - } - - public FreonJobInfo setMeanKeyWriteTime(String deviationKeyWriteTimeVal) { - this.meanKeyWriteTime = deviationKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setDeviationKeyWriteTime( - String deviationKeyWriteTimeVal) { - this.deviationKeyWriteTime = deviationKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setTenQuantileKeyWriteTime( - String[] tenQuantileKeyWriteTimeVal) { - this.tenQuantileKeyWriteTime = tenQuantileKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setMeanKeyCreateTime(String deviationKeyWriteTimeVal) { - this.meanKeyCreateTime = deviationKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setDeviationKeyCreateTime( - String deviationKeyCreateTimeVal) { - this.deviationKeyCreateTime = deviationKeyCreateTimeVal; - return this; - } - - public FreonJobInfo setTenQuantileKeyCreateTime( - String[] tenQuantileKeyCreateTimeVal) { - this.tenQuantileKeyCreateTime = tenQuantileKeyCreateTimeVal; - return this; - } - - public FreonJobInfo setMeanBucketCreateTime( - String deviationKeyWriteTimeVal) { - this.meanBucketCreateTime = deviationKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setDeviationBucketCreateTime( - String deviationBucketCreateTimeVal) { - this.deviationBucketCreateTime = deviationBucketCreateTimeVal; - return this; - } - - public FreonJobInfo setTenQuantileBucketCreateTime( - String[] tenQuantileBucketCreateTimeVal) { - this.tenQuantileBucketCreateTime = tenQuantileBucketCreateTimeVal; - return this; - } - - public FreonJobInfo setMeanVolumeCreateTime( - String deviationKeyWriteTimeVal) { - this.meanVolumeCreateTime = deviationKeyWriteTimeVal; - return this; - } - - public FreonJobInfo setDeviationVolumeCreateTime( - String deviationVolumeCreateTimeVal) { - this.deviationVolumeCreateTime = deviationVolumeCreateTimeVal; - return this; - } - - public FreonJobInfo setTenQuantileVolumeCreateTime( - String[] tenQuantileVolumeCreateTimeVal) { - this.tenQuantileVolumeCreateTime = tenQuantileVolumeCreateTimeVal; - return this; - } - - public String getJobStartTime() { - return jobStartTime; - } - - public String getNumOfVolumes() { - return numOfVolumes; - } - - public String getNumOfBuckets() { - return numOfBuckets; - } - - public String getNumOfKeys() { - return numOfKeys; - } - - public String getNumOfThreads() { - return numOfThreads; - } - - public String getMode() { - return mode; - } - - public String getExecTime() { - return execTime; - } - - public String getReplicationFactor() { - return replicationFactor; - } - - public String getReplicationType() { - return replicationType; - } - - public String getStatus() { - return status; - } - - public int getKeySize() { - return keySize; - } - - public String getGitBaseRevision() { - return gitBaseRevision; - } - - public String getDataWritten() { - return dataWritten; - } - - public String getTotalThroughputPerSecond() { - return totalThroughputPerSecond; - } - - public String getMeanVolumeCreateTime() { - return meanVolumeCreateTime; - } - - public String getDeviationVolumeCreateTime() { - return deviationVolumeCreateTime; - } - - public String[] getTenQuantileVolumeCreateTime() { - return tenQuantileVolumeCreateTime; - } - - public String getMeanBucketCreateTime() { - return meanBucketCreateTime; - } - - public String getDeviationBucketCreateTime() { - return deviationBucketCreateTime; - } - - public String[] getTenQuantileBucketCreateTime() { - return tenQuantileBucketCreateTime; - } - - public String getMeanKeyCreateTime() { - return meanKeyCreateTime; - } - - public String getDeviationKeyCreateTime() { - return deviationKeyCreateTime; - } - - public String[] getTenQuantileKeyCreateTime() { - return tenQuantileKeyCreateTime; - } - - public String getMeanKeyWriteTime() { - return meanKeyWriteTime; - } - - public String getDeviationKeyWriteTime() { - return deviationKeyWriteTime; - } - - public String[] getTenQuantileKeyWriteTime() { - return tenQuantileKeyWriteTime; - } - } - - private class ProgressBar implements Runnable { - - private static final long REFRESH_INTERVAL = 1000L; - - private PrintStream stream; - private Supplier<Long> currentValue; - private long maxValue; - - ProgressBar(PrintStream stream, Supplier<Long> currentValue, - long maxValue) { - this.stream = stream; - this.currentValue = currentValue; - this.maxValue = maxValue; - } - - @Override - public void run() { - try { - stream.println(); - long value; - while ((value = currentValue.get()) < maxValue) { - print(value); - if (completed) { - break; - } - Thread.sleep(REFRESH_INTERVAL); - } - if (exception) { - stream.println(); - stream.println("Incomplete termination, " + - "check log for exception."); - } else { - print(maxValue); - } - stream.println(); - } catch (InterruptedException e) { - } - } - - /** - * Given current value prints the progress bar. - * - * @param value - */ - private void print(long value) { - stream.print('\r'); - double percent = 100.0 * value / maxValue; - StringBuilder sb = new StringBuilder(); - sb.append(" " + String.format("%.2f", percent) + "% |"); - - for (int i = 0; i <= percent; i++) { - sb.append('â'); - } - for (int j = 0; j < 100 - percent; j++) { - sb.append(' '); - } - sb.append("| "); - sb.append(value + "/" + maxValue); - long timeInSec = TimeUnit.SECONDS.convert( - System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600, - (timeInSec % 3600) / 60, timeInSec % 60); - sb.append(" Time: " + timeToPrint); - stream.print(sb); - } - } - - /** - * Validates the write done in ozone cluster. - */ - private class Validator implements Runnable { - - @Override - public void run() { - while (!completed) { - try { - KeyValue kv = validationQueue.poll(5, TimeUnit.SECONDS); - if (kv != null) { - - OzoneInputStream is = kv.bucket.readKey(kv.key); - byte[] value = new byte[kv.value.length]; - int length = is.read(value); - totalWritesValidated++; - if (length == kv.value.length && Arrays.equals(value, kv.value)) { - writeValidationSuccessCount++; - } else { - writeValidationFailureCount++; - LOG.warn("Data validation error for key {}/{}/{}", - kv.bucket.getVolumeName(), kv.bucket, kv.key); - LOG.warn("Expected: {}, Actual: {}", - DFSUtil.bytes2String(kv.value), - DFSUtil.bytes2String(value)); - } - } - } catch (IOException | InterruptedException ex) { - LOG.error("Exception while validating write: " + ex.getMessage()); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/package-info.java deleted file mode 100644 index 4d74f48..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.tools; - -/** - This package contains class used for testing and benchmarking ozone cluster. - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java deleted file mode 100644 index 7dd9a33..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.web; - -import org.apache.hadoop.ozone.client.rest.OzoneExceptionMapper; -import org.apache.hadoop.ozone.web.handlers.BucketHandler; -import org.apache.hadoop.ozone.web.handlers.KeyHandler; -import org.apache.hadoop.ozone.web.handlers.ServiceFilter; -import org.apache.hadoop.ozone.web.handlers.VolumeHandler; -import org.apache.hadoop.ozone.web.messages.LengthInputStreamMessageBodyWriter; -import org.apache.hadoop.ozone.web.messages.StringMessageBodyWriter; - -import javax.ws.rs.core.Application; -import java.util.HashSet; -import java.util.Set; - -/** - * Ozone Application. - */ -public class ObjectStoreApplication extends Application { - public ObjectStoreApplication() { - super(); - } - - @Override - public Set<Class<?>> getClasses() { - HashSet<Class<?>> set = new HashSet<>(); - set.add(BucketHandler.class); - set.add(VolumeHandler.class); - set.add(KeyHandler.class); - set.add(OzoneExceptionMapper.class); - set.add(LengthInputStreamMessageBodyWriter.class); - set.add(StringMessageBodyWriter.class); - return set; - } - - @Override - public Set<Object> getSingletons() { - HashSet<Object> set = new HashSet<>(); - set.add(ServiceFilter.class); - return set; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/OzoneHttpServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/OzoneHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/OzoneHttpServer.java deleted file mode 100644 index b6a8fda..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/OzoneHttpServer.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.web; - -import com.google.common.base.Optional; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.http.HttpConfig; -import org.apache.hadoop.http.HttpServer2; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.ozone.client.OzoneClientUtils; -import org.eclipse.jetty.webapp.WebAppContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.http.HttpServlet; -import java.io.IOException; -import java.net.InetSocketAddress; - -/** - * Base class for HTTP server of the Ozone related components. - */ -public abstract class OzoneHttpServer { - - private static final Logger LOG = - LoggerFactory.getLogger(OzoneHttpServer.class); - - private HttpServer2 httpServer; - private final Configuration conf; - - private InetSocketAddress httpAddress; - private InetSocketAddress httpsAddress; - - private HttpConfig.Policy policy; - - private String name; - - public OzoneHttpServer(Configuration conf, String name) throws IOException { - this.name = name; - this.conf = conf; - if (isEnabled()) { - policy = DFSUtil.getHttpPolicy(conf); - if (policy.isHttpEnabled()) { - this.httpAddress = getHttpBindAddress(); - } - if (policy.isHttpsEnabled()) { - this.httpsAddress = getHttpsBindAddress(); - } - HttpServer2.Builder builder = null; - builder = DFSUtil.httpServerTemplateForNNAndJN(conf, this.httpAddress, - this.httpsAddress, name, getSpnegoPrincipal(), getKeytabFile()); - - final boolean xFrameEnabled = conf.getBoolean( - DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED, - DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED_DEFAULT); - - final String xFrameOptionValue = conf.getTrimmed( - DFSConfigKeys.DFS_XFRAME_OPTION_VALUE, - DFSConfigKeys.DFS_XFRAME_OPTION_VALUE_DEFAULT); - - builder.configureXFrame(xFrameEnabled).setXFrameOption(xFrameOptionValue); - - httpServer = builder.build(); - - } - - } - - /** - * Add a servlet to OzoneHttpServer. - * @param servletName The name of the servlet - * @param pathSpec The path spec for the servlet - * @param clazz The servlet class - */ - protected void addServlet(String servletName, String pathSpec, - Class<? extends HttpServlet> clazz) { - httpServer.addServlet(servletName, pathSpec, clazz); - } - - /** - * Returns the WebAppContext associated with this HttpServer. - * @return WebAppContext - */ - protected WebAppContext getWebAppContext() { - return httpServer.getWebAppContext(); - } - - protected InetSocketAddress getBindAddress(String bindHostKey, - String addressKey, String bindHostDefault, int bindPortdefault) { - final Optional<String> bindHost = - OzoneClientUtils.getHostNameFromConfigKeys(conf, bindHostKey); - - final Optional<Integer> addressPort = - OzoneClientUtils.getPortNumberFromConfigKeys(conf, addressKey); - - final Optional<String> addresHost = - OzoneClientUtils.getHostNameFromConfigKeys(conf, addressKey); - - String hostName = bindHost.or(addresHost).or(bindHostDefault); - - return NetUtils.createSocketAddr( - hostName + ":" + addressPort.or(bindPortdefault)); - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the HTTPS web interface. - * - * @return Target InetSocketAddress for the Ozone HTTPS endpoint. - */ - public InetSocketAddress getHttpsBindAddress() { - return getBindAddress(getHttpsBindHostKey(), getHttpsAddressKey(), - getBindHostDefault(), getHttpsBindPortDefault()); - } - - /** - * Retrieve the socket address that should be used by clients to connect - * to the HTTP web interface. - * <p> - * * @return Target InetSocketAddress for the Ozone HTTP endpoint. - */ - public InetSocketAddress getHttpBindAddress() { - return getBindAddress(getHttpBindHostKey(), getHttpAddressKey(), - getBindHostDefault(), getHttpBindPortDefault()); - - } - - public void start() throws IOException { - if (httpServer != null && isEnabled()) { - httpServer.start(); - updateConnectorAddress(); - } - - } - - private boolean isEnabled() { - return conf.getBoolean(getEnabledKey(), true); - } - - public void stop() throws Exception { - if (httpServer != null) { - httpServer.stop(); - } - } - - /** - * Update the configured listen address based on the real port - * <p> - * (eg. replace :0 with real port) - */ - public void updateConnectorAddress() { - int connIdx = 0; - if (policy.isHttpEnabled()) { - httpAddress = httpServer.getConnectorAddress(connIdx++); - String realAddress = NetUtils.getHostPortString(httpAddress); - conf.set(getHttpAddressKey(), realAddress); - LOG.info( - String.format("HTTP server of %s is listening at http://%s", - name.toUpperCase(), realAddress)); - } - - if (policy.isHttpsEnabled()) { - httpsAddress = httpServer.getConnectorAddress(connIdx); - String realAddress = NetUtils.getHostPortString(httpsAddress); - conf.set(getHttpsAddressKey(), realAddress); - LOG.info( - String.format("HTTP server of %s is listening at https://%s", - name.toUpperCase(), realAddress)); - } - } - - public InetSocketAddress getHttpAddress() { - return httpAddress; - } - - public InetSocketAddress getHttpsAddress() { - return httpsAddress; - } - - protected abstract String getHttpAddressKey(); - - protected abstract String getHttpsAddressKey(); - - protected abstract String getHttpBindHostKey(); - - protected abstract String getHttpsBindHostKey(); - - protected abstract String getBindHostDefault(); - - protected abstract int getHttpBindPortDefault(); - - protected abstract int getHttpsBindPortDefault(); - - protected abstract String getKeytabFile(); - - protected abstract String getSpnegoPrincipal(); - - protected abstract String getEnabledKey(); - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
