http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java deleted file mode 100644 index a430400..0000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MetricsRegistry; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; -import org.apache.hadoop.metrics2.lib.MutableRate; - -/** - * The client metrics for the Storage Container protocol. - */ [email protected] -@Metrics(about = "Storage Container Client Metrics", context = "dfs") -public class XceiverClientMetrics { - public static final String SOURCE_NAME = XceiverClientMetrics.class - .getSimpleName(); - - private @Metric MutableCounterLong pendingOps; - private MutableCounterLong[] pendingOpsArray; - private MutableRate[] containerOpsLatency; - private MetricsRegistry registry; - - public XceiverClientMetrics() { - int numEnumEntries = ContainerProtos.Type.values().length; - this.registry = new MetricsRegistry(SOURCE_NAME); - - this.pendingOpsArray = new MutableCounterLong[numEnumEntries]; - this.containerOpsLatency = new MutableRate[numEnumEntries]; - for (int i = 0; i < numEnumEntries; i++) { - pendingOpsArray[i] = registry.newCounter( - "numPending" + ContainerProtos.Type.forNumber(i + 1), - "number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops", - (long) 0); - - containerOpsLatency[i] = registry.newRate( - ContainerProtos.Type.forNumber(i + 1) + "Latency", - "latency of " + ContainerProtos.Type.forNumber(i + 1) - + " ops"); - } - } - - public static XceiverClientMetrics create() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - return ms.register(SOURCE_NAME, "Storage Container Client Metrics", - new XceiverClientMetrics()); - } - - public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) { - pendingOps.incr(); - pendingOpsArray[type.ordinal()].incr(); - } - - public void decrPendingContainerOpsMetrics(ContainerProtos.Type type) { - pendingOps.incr(-1); - pendingOpsArray[type.ordinal()].incr(-1); - } - - public void addContainerOpsLatency(ContainerProtos.Type type, - long latencyNanos) { - containerOpsLatency[type.ordinal()].add(latencyNanos); - } - - public long getContainerOpsMetrics(ContainerProtos.Type type) { - return pendingOpsArray[type.ordinal()].value(); - } - - public void unRegister() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - ms.unregisterSource(SOURCE_NAME); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java deleted file mode 100644 index 0d301d9..0000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm; - -import org.apache.hadoop.hdds.HddsUtils; -import org.apache.hadoop.io.MultipleIOException; -import org.apache.ratis.retry.RetryPolicy; -import org.apache.ratis.shaded.com.google.protobuf - .InvalidProtocolBufferException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.ratis.RatisHelper; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.rpc.RpcType; -import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.ratis.util.CheckedBiConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.atomic.AtomicReference; - -/** - * An abstract implementation of {@link XceiverClientSpi} using Ratis. - * The underlying RPC mechanism can be chosen via the constructor. - */ -public final class XceiverClientRatis extends XceiverClientSpi { - static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class); - - public static XceiverClientRatis newXceiverClientRatis( - Pipeline pipeline, Configuration ozoneConf) { - final String rpcType = ozoneConf.get( - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final int maxOutstandingRequests = - HddsClientUtils.getMaxOutstandingRequests(ozoneConf); - final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); - return new XceiverClientRatis(pipeline, - SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests, - retryPolicy); - } - - private final Pipeline pipeline; - private final RpcType rpcType; - private final AtomicReference<RaftClient> client = new AtomicReference<>(); - private final int maxOutstandingRequests; - private final RetryPolicy retryPolicy; - - /** - * Constructs a client. - */ - private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, - int maxOutStandingChunks, RetryPolicy retryPolicy) { - super(); - this.pipeline = pipeline; - this.rpcType = rpcType; - this.maxOutstandingRequests = maxOutStandingChunks; - this.retryPolicy = retryPolicy; - } - - /** - * {@inheritDoc} - */ - public void createPipeline() throws IOException { - final RaftGroup group = RatisHelper.newRaftGroup(pipeline); - LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group); - callRatisRpc(pipeline.getMachines(), - (raftClient, peer) -> raftClient.groupAdd(group, peer.getId())); - } - - /** - * {@inheritDoc} - */ - public void destroyPipeline() throws IOException { - final RaftGroup group = RatisHelper.newRaftGroup(pipeline); - LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); - callRatisRpc(pipeline.getMachines(), (raftClient, peer) -> raftClient - .groupRemove(group.getGroupId(), true, peer.getId())); - } - - /** - * Returns Ratis as pipeline Type. - * - * @return - Ratis - */ - @Override - public HddsProtos.ReplicationType getPipelineType() { - return HddsProtos.ReplicationType.RATIS; - } - - private void callRatisRpc(List<DatanodeDetails> datanodes, - CheckedBiConsumer<RaftClient, RaftPeer, IOException> rpc) - throws IOException { - if (datanodes.isEmpty()) { - return; - } - - final List<IOException> exceptions = - Collections.synchronizedList(new ArrayList<>()); - datanodes.parallelStream().forEach(d -> { - final RaftPeer p = RatisHelper.toRaftPeer(d); - try (RaftClient client = RatisHelper - .newRaftClient(rpcType, p, retryPolicy)) { - rpc.accept(client, p); - } catch (IOException ioe) { - exceptions.add( - new IOException("Failed invoke Ratis rpc " + rpc + " for " + d, - ioe)); - } - }); - if (!exceptions.isEmpty()) { - throw MultipleIOException.createIOException(exceptions); - } - } - - @Override - public Pipeline getPipeline() { - return pipeline; - } - - @Override - public void connect() throws Exception { - LOG.debug("Connecting to pipeline:{} leader:{}", - getPipeline().getId(), - RatisHelper.toRaftPeerId(pipeline.getLeader())); - // TODO : XceiverClient ratis should pass the config value of - // maxOutstandingRequests so as to set the upper bound on max no of async - // requests to be handled by raft client - if (!client.compareAndSet(null, - RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy))) { - throw new IllegalStateException("Client is already connected."); - } - } - - @Override - public void close() { - final RaftClient c = client.getAndSet(null); - if (c != null) { - try { - c.close(); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - } - - private RaftClient getClient() { - return Objects.requireNonNull(client.get(), "client is null"); - } - - private CompletableFuture<RaftClientReply> sendRequestAsync( - ContainerCommandRequestProto request) { - boolean isReadOnlyRequest = HddsUtils.isReadOnly(request); - ByteString byteString = request.toByteString(); - LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request); - return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) : - getClient().sendAsync(() -> byteString); - } - - /** - * Sends a given command to server gets a waitable future back. - * - * @param request Request - * @return Response to the command - * @throws IOException - */ - @Override - public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( - ContainerCommandRequestProto request) { - return sendRequestAsync(request).whenComplete((reply, e) -> - LOG.debug("received reply {} for request: {} exception: {}", request, - reply, e)) - .thenApply(reply -> { - try { - return ContainerCommandResponseProto.parseFrom( - reply.getMessage().getContent()); - } catch (InvalidProtocolBufferException e) { - throw new CompletionException(e); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java deleted file mode 100644 index fed589c..0000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ /dev/null @@ -1,476 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.client; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.protocolPB - .StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerData; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ReadContainerResponseProto; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.UUID; - -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState - .ALLOCATED; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState - .OPEN; - -/** - * This class provides the client-facing APIs of container operations. - */ -public class ContainerOperationClient implements ScmClient { - - private static final Logger LOG = - LoggerFactory.getLogger(ContainerOperationClient.class); - private static long containerSizeB = -1; - private final StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - private final XceiverClientManager xceiverClientManager; - - public ContainerOperationClient( - StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient, - XceiverClientManager xceiverClientManager) { - this.storageContainerLocationClient = storageContainerLocationClient; - this.xceiverClientManager = xceiverClientManager; - } - - /** - * Return the capacity of containers. The current assumption is that all - * containers have the same capacity. Therefore one static is sufficient for - * any container. - * @return The capacity of one container in number of bytes. - */ - public static long getContainerSizeB() { - return containerSizeB; - } - - /** - * Set the capacity of container. Should be exactly once on system start. - * @param size Capacity of one container in number of bytes. - */ - public static void setContainerSizeB(long size) { - containerSizeB = size; - } - - /** - * @inheritDoc - */ - @Override - public ContainerWithPipeline createContainer(String owner) - throws IOException { - XceiverClientSpi client = null; - try { - ContainerWithPipeline containerWithPipeline = - storageContainerLocationClient.allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), owner); - Pipeline pipeline = containerWithPipeline.getPipeline(); - client = xceiverClientManager.acquireClient(pipeline, - containerWithPipeline.getContainerInfo().getContainerID()); - - // Allocated State means that SCM has allocated this pipeline in its - // namespace. The client needs to create the pipeline on the machines - // which was choosen by the SCM. - Preconditions.checkState(pipeline.getLifeCycleState() == ALLOCATED || - pipeline.getLifeCycleState() == OPEN, "Unexpected pipeline state"); - if (pipeline.getLifeCycleState() == ALLOCATED) { - createPipeline(client, pipeline); - } - createContainer(client, - containerWithPipeline.getContainerInfo().getContainerID()); - return containerWithPipeline; - } finally { - if (client != null) { - xceiverClientManager.releaseClient(client); - } - } - } - - /** - * Create a container over pipeline specified by the SCM. - * - * @param client - Client to communicate with Datanodes. - * @param containerId - Container ID. - * @throws IOException - */ - public void createContainer(XceiverClientSpi client, - long containerId) throws IOException { - String traceID = UUID.randomUUID().toString(); - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.container, - containerId, - ObjectStageChangeRequestProto.Op.create, - ObjectStageChangeRequestProto.Stage.begin); - ContainerProtocolCalls.createContainer(client, containerId, traceID); - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.container, - containerId, - ObjectStageChangeRequestProto.Op.create, - ObjectStageChangeRequestProto.Stage.complete); - - // Let us log this info after we let SCM know that we have completed the - // creation state. - if (LOG.isDebugEnabled()) { - LOG.debug("Created container " + containerId - + " leader:" + client.getPipeline().getLeader() - + " machines:" + client.getPipeline().getMachines()); - } - } - - /** - * Creates a pipeline over the machines choosen by the SCM. - * - * @param client - Client - * @param pipeline - pipeline to be createdon Datanodes. - * @throws IOException - */ - private void createPipeline(XceiverClientSpi client, Pipeline pipeline) - throws IOException { - - Preconditions.checkNotNull(pipeline.getId(), "Pipeline " + - "name cannot be null when client create flag is set."); - - // Pipeline creation is a three step process. - // - // 1. Notify SCM that this client is doing a create pipeline on - // datanodes. - // - // 2. Talk to Datanodes to create the pipeline. - // - // 3. update SCM that pipeline creation was successful. - - // TODO: this has not been fully implemented on server side - // SCMClientProtocolServer#notifyObjectStageChange - // TODO: when implement the pipeline state machine, change - // the pipeline name (string) to pipeline id (long) - //storageContainerLocationClient.notifyObjectStageChange( - // ObjectStageChangeRequestProto.Type.pipeline, - // pipeline.getPipelineName(), - // ObjectStageChangeRequestProto.Op.create, - // ObjectStageChangeRequestProto.Stage.begin); - - client.createPipeline(); - - //storageContainerLocationClient.notifyObjectStageChange( - // ObjectStageChangeRequestProto.Type.pipeline, - // pipeline.getPipelineName(), - // ObjectStageChangeRequestProto.Op.create, - // ObjectStageChangeRequestProto.Stage.complete); - - // TODO : Should we change the state on the client side ?? - // That makes sense, but it is not needed for the client to work. - LOG.debug("Pipeline creation successful. Pipeline: {}", - pipeline.toString()); - } - - /** - * @inheritDoc - */ - @Override - public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor factor, String owner) throws IOException { - XceiverClientSpi client = null; - try { - // allocate container on SCM. - ContainerWithPipeline containerWithPipeline = - storageContainerLocationClient.allocateContainer(type, factor, - owner); - Pipeline pipeline = containerWithPipeline.getPipeline(); - client = xceiverClientManager.acquireClient(pipeline, - containerWithPipeline.getContainerInfo().getContainerID()); - - // Allocated State means that SCM has allocated this pipeline in its - // namespace. The client needs to create the pipeline on the machines - // which was choosen by the SCM. - if (pipeline.getLifeCycleState() == ALLOCATED) { - createPipeline(client, pipeline); - } - // connect to pipeline leader and allocate container on leader datanode. - client = xceiverClientManager.acquireClient(pipeline, - containerWithPipeline.getContainerInfo().getContainerID()); - createContainer(client, - containerWithPipeline.getContainerInfo().getContainerID()); - return containerWithPipeline; - } finally { - if (client != null) { - xceiverClientManager.releaseClient(client); - } - } - } - - /** - * Returns a set of Nodes that meet a query criteria. - * - * @param nodeStatuses - Criteria that we want the node to have. - * @param queryScope - Query scope - Cluster or pool. - * @param poolName - if it is pool, a pool name is required. - * @return A set of nodes that meet the requested criteria. - * @throws IOException - */ - @Override - public List<HddsProtos.Node> queryNode(HddsProtos.NodeState - nodeStatuses, HddsProtos.QueryScope queryScope, String poolName) - throws IOException { - return storageContainerLocationClient.queryNode(nodeStatuses, queryScope, - poolName); - } - - /** - * Creates a specified replication pipeline. - */ - @Override - public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type, - HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool) - throws IOException { - return storageContainerLocationClient.createReplicationPipeline(type, - factor, nodePool); - } - - @Override - public void close() { - try { - xceiverClientManager.close(); - } catch (Exception ex) { - LOG.error("Can't close " + this.getClass().getSimpleName(), ex); - } - } - - /** - * Deletes an existing container. - * - * @param containerId - ID of the container. - * @param pipeline - Pipeline that represents the container. - * @param force - true to forcibly delete the container. - * @throws IOException - */ - @Override - public void deleteContainer(long containerId, Pipeline pipeline, - boolean force) throws IOException { - XceiverClientSpi client = null; - try { - client = xceiverClientManager.acquireClient(pipeline, containerId); - String traceID = UUID.randomUUID().toString(); - ContainerProtocolCalls - .deleteContainer(client, containerId, force, traceID); - storageContainerLocationClient - .deleteContainer(containerId); - if (LOG.isDebugEnabled()) { - LOG.debug("Deleted container {}, leader: {}, machines: {} ", - containerId, - pipeline.getLeader(), - pipeline.getMachines()); - } - } finally { - if (client != null) { - xceiverClientManager.releaseClient(client); - } - } - } - - /** - * Delete the container, this will release any resource it uses. - * @param containerID - containerID. - * @param force - True to forcibly delete the container. - * @throws IOException - */ - @Override - public void deleteContainer(long containerID, boolean force) - throws IOException { - ContainerWithPipeline info = getContainerWithPipeline(containerID); - deleteContainer(containerID, info.getPipeline(), force); - } - - /** - * {@inheritDoc} - */ - @Override - public List<ContainerInfo> listContainer(long startContainerID, - int count) throws IOException { - return storageContainerLocationClient.listContainer( - startContainerID, count); - } - - /** - * Get meta data from an existing container. - * - * @param containerID - ID of the container. - * @param pipeline - Pipeline where the container is located. - * @return ContainerInfo - * @throws IOException - */ - @Override - public ContainerData readContainer(long containerID, - Pipeline pipeline) throws IOException { - XceiverClientSpi client = null; - try { - client = xceiverClientManager.acquireClient(pipeline, containerID); - String traceID = UUID.randomUUID().toString(); - ReadContainerResponseProto response = - ContainerProtocolCalls.readContainer(client, containerID, traceID); - if (LOG.isDebugEnabled()) { - LOG.debug("Read container {}, leader: {}, machines: {} ", - containerID, - pipeline.getLeader(), - pipeline.getMachines()); - } - return response.getContainerData(); - } finally { - if (client != null) { - xceiverClientManager.releaseClient(client); - } - } - } - - /** - * Get meta data from an existing container. - * @param containerID - ID of the container. - * @return ContainerInfo - a message of protobuf which has basic info - * of a container. - * @throws IOException - */ - @Override - public ContainerData readContainer(long containerID) throws IOException { - ContainerWithPipeline info = getContainerWithPipeline(containerID); - return readContainer(containerID, info.getPipeline()); - } - - /** - * Given an id, return the pipeline associated with the container. - * @param containerId - String Container ID - * @return Pipeline of the existing container, corresponding to the given id. - * @throws IOException - */ - @Override - public ContainerInfo getContainer(long containerId) throws - IOException { - return storageContainerLocationClient.getContainer(containerId); - } - - /** - * Gets a container by Name -- Throws if the container does not exist. - * - * @param containerId - Container ID - * @return ContainerWithPipeline - * @throws IOException - */ - @Override - public ContainerWithPipeline getContainerWithPipeline(long containerId) - throws IOException { - return storageContainerLocationClient.getContainerWithPipeline(containerId); - } - - /** - * Close a container. - * - * @param pipeline the container to be closed. - * @throws IOException - */ - @Override - public void closeContainer(long containerId, Pipeline pipeline) - throws IOException { - XceiverClientSpi client = null; - try { - LOG.debug("Close container {}", pipeline); - /* - TODO: two orders here, revisit this later: - 1. close on SCM first, then on data node - 2. close on data node first, then on SCM - - with 1: if client failed after closing on SCM, then there is a - container SCM thinks as closed, but is actually open. Then SCM will no - longer allocate block to it, which is fine. But SCM may later try to - replicate this "closed" container, which I'm not sure is safe. - - with 2: if client failed after close on datanode, then there is a - container SCM thinks as open, but is actually closed. Then SCM will still - try to allocate block to it. Which will fail when actually doing the - write. No more data can be written, but at least the correctness and - consistency of existing data will maintain. - - For now, take the #2 way. - */ - // Actually close the container on Datanode - client = xceiverClientManager.acquireClient(pipeline, containerId); - String traceID = UUID.randomUUID().toString(); - - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.container, - containerId, - ObjectStageChangeRequestProto.Op.close, - ObjectStageChangeRequestProto.Stage.begin); - - ContainerProtocolCalls.closeContainer(client, containerId, traceID); - // Notify SCM to close the container - storageContainerLocationClient.notifyObjectStageChange( - ObjectStageChangeRequestProto.Type.container, - containerId, - ObjectStageChangeRequestProto.Op.close, - ObjectStageChangeRequestProto.Stage.complete); - } finally { - if (client != null) { - xceiverClientManager.releaseClient(client); - } - } - } - - /** - * Close a container. - * - * @throws IOException - */ - @Override - public void closeContainer(long containerId) - throws IOException { - ContainerWithPipeline info = getContainerWithPipeline(containerId); - Pipeline pipeline = info.getPipeline(); - closeContainer(containerId, pipeline); - } - - /** - * Get the the current usage information. - * @param containerID - ID of the container. - * @return the size of the given container. - * @throws IOException - */ - @Override - public long getContainerSize(long containerID) throws IOException { - // TODO : Fix this, it currently returns the capacity - // but not the current usage. - long size = getContainerSizeB(); - if (size == -1) { - throw new IOException("Container size unknown!"); - } - return size; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java deleted file mode 100644 index 9c59038..0000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.client; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.ParseException; -import java.time.Instant; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.concurrent.TimeUnit; - -/** - * Utility methods for Ozone and Container Clients. - * - * The methods to retrieve SCM service endpoints assume there is a single - * SCM service instance. This will change when we switch to replicated service - * instances for redundancy. - */ [email protected] [email protected] -public final class HddsClientUtils { - - private static final Logger LOG = LoggerFactory.getLogger( - HddsClientUtils.class); - - private static final int NO_PORT = -1; - - private HddsClientUtils() { - } - - /** - * Date format that used in ozone. Here the format is thread safe to use. - */ - private static final ThreadLocal<DateTimeFormatter> DATE_FORMAT = - ThreadLocal.withInitial(() -> { - DateTimeFormatter format = - DateTimeFormatter.ofPattern(OzoneConsts.OZONE_DATE_FORMAT); - return format.withZone(ZoneId.of(OzoneConsts.OZONE_TIME_ZONE)); - }); - - - /** - * Convert time in millisecond to a human readable format required in ozone. - * @return a human readable string for the input time - */ - public static String formatDateTime(long millis) { - ZonedDateTime dateTime = ZonedDateTime.ofInstant( - Instant.ofEpochMilli(millis), DATE_FORMAT.get().getZone()); - return DATE_FORMAT.get().format(dateTime); - } - - /** - * Convert time in ozone date format to millisecond. - * @return time in milliseconds - */ - public static long formatDateTime(String date) throws ParseException { - Preconditions.checkNotNull(date, "Date string should not be null."); - return ZonedDateTime.parse(date, DATE_FORMAT.get()) - .toInstant().toEpochMilli(); - } - - - - /** - * verifies that bucket name / volume name is a valid DNS name. - * - * @param resName Bucket or volume Name to be validated - * - * @throws IllegalArgumentException - */ - public static void verifyResourceName(String resName) - throws IllegalArgumentException { - - if (resName == null) { - throw new IllegalArgumentException("Bucket or Volume name is null"); - } - - if ((resName.length() < OzoneConsts.OZONE_MIN_BUCKET_NAME_LENGTH) || - (resName.length() > OzoneConsts.OZONE_MAX_BUCKET_NAME_LENGTH)) { - throw new IllegalArgumentException( - "Bucket or Volume length is illegal, " + - "valid length is 3-63 characters"); - } - - if ((resName.charAt(0) == '.') || (resName.charAt(0) == '-')) { - throw new IllegalArgumentException( - "Bucket or Volume name cannot start with a period or dash"); - } - - if ((resName.charAt(resName.length() - 1) == '.') || - (resName.charAt(resName.length() - 1) == '-')) { - throw new IllegalArgumentException( - "Bucket or Volume name cannot end with a period or dash"); - } - - boolean isIPv4 = true; - char prev = (char) 0; - - for (int index = 0; index < resName.length(); index++) { - char currChar = resName.charAt(index); - - if (currChar != '.') { - isIPv4 = ((currChar >= '0') && (currChar <= '9')) && isIPv4; - } - - if (currChar > 'A' && currChar < 'Z') { - throw new IllegalArgumentException( - "Bucket or Volume name does not support uppercase characters"); - } - - if ((currChar != '.') && (currChar != '-')) { - if ((currChar < '0') || (currChar > '9' && currChar < 'a') || - (currChar > 'z')) { - throw new IllegalArgumentException("Bucket or Volume name has an " + - "unsupported character : " + - currChar); - } - } - - if ((prev == '.') && (currChar == '.')) { - throw new IllegalArgumentException("Bucket or Volume name should not " + - "have two contiguous periods"); - } - - if ((prev == '-') && (currChar == '.')) { - throw new IllegalArgumentException( - "Bucket or Volume name should not have period after dash"); - } - - if ((prev == '.') && (currChar == '-')) { - throw new IllegalArgumentException( - "Bucket or Volume name should not have dash after period"); - } - prev = currChar; - } - - if (isIPv4) { - throw new IllegalArgumentException( - "Bucket or Volume name cannot be an IPv4 address or all numeric"); - } - } - - /** - * verifies that bucket / volume name is a valid DNS name. - * - * @param resourceNames Array of bucket / volume names to be verified. - */ - public static void verifyResourceName(String... resourceNames) { - for (String resourceName : resourceNames) { - HddsClientUtils.verifyResourceName(resourceName); - } - } - - /** - * Checks that object parameters passed as reference is not null. - * - * @param references Array of object references to be checked. - * @param <T> - */ - public static <T> void checkNotNull(T... references) { - for (T ref: references) { - Preconditions.checkNotNull(ref); - } - } - - /** - * Returns the cache value to be used for list calls. - * @param conf Configuration object - * @return list cache size - */ - public static int getListCacheSize(Configuration conf) { - return conf.getInt(OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE, - OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE_DEFAULT); - } - - /** - * @return a default instance of {@link CloseableHttpClient}. - */ - public static CloseableHttpClient newHttpClient() { - return HddsClientUtils.newHttpClient(new Configuration()); - } - - /** - * Returns a {@link CloseableHttpClient} configured by given configuration. - * If conf is null, returns a default instance. - * - * @param conf configuration - * @return a {@link CloseableHttpClient} instance. - */ - public static CloseableHttpClient newHttpClient(Configuration conf) { - long socketTimeout = OzoneConfigKeys - .OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT; - long connectionTimeout = OzoneConfigKeys - .OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT; - if (conf != null) { - socketTimeout = conf.getTimeDuration( - OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT, - OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - connectionTimeout = conf.getTimeDuration( - OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT, - OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - } - - CloseableHttpClient client = HttpClients.custom() - .setDefaultRequestConfig( - RequestConfig.custom() - .setSocketTimeout(Math.toIntExact(socketTimeout)) - .setConnectTimeout(Math.toIntExact(connectionTimeout)) - .build()) - .build(); - return client; - } - - /** - * Returns the maximum no of outstanding async requests to be handled by - * Standalone and Ratis client. - */ - public static int getMaxOutstandingRequests(Configuration config) { - return config - .getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS, - ScmConfigKeys - .SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java deleted file mode 100644 index 73ad78c..0000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.client; - -/** - * Client facing classes for the container operations. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java deleted file mode 100644 index 9390bc1..0000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm; - -/** - * Classes for different type of container service client. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java deleted file mode 100644 index a483197..0000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.storage; - -import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ReadChunkResponseProto; -import org.apache.hadoop.hdds.client.BlockID; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; - -/** - * An {@link InputStream} used by the REST service in combination with the - * SCMClient to read the value of a key from a sequence - * of container chunks. All bytes of the key value are stored in container - * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} - * instances. This class encapsulates all state management for iterating - * through the sequence of chunks and the sequence of buffers within each chunk. - */ -public class ChunkInputStream extends InputStream implements Seekable { - - private static final int EOF = -1; - - private final BlockID blockID; - private final String traceID; - private XceiverClientManager xceiverClientManager; - private XceiverClientSpi xceiverClient; - private List<ChunkInfo> chunks; - private int chunkIndex; - private long[] chunkOffset; - private List<ByteBuffer> buffers; - private int bufferIndex; - - /** - * Creates a new ChunkInputStream. - * - * @param blockID block ID of the chunk - * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls - * @param chunks list of chunks to read - * @param traceID container protocol call traceID - */ - public ChunkInputStream( - BlockID blockID, XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) { - this.blockID = blockID; - this.traceID = traceID; - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; - this.chunks = chunks; - this.chunkIndex = -1; - // chunkOffset[i] stores offset at which chunk i stores data in - // ChunkInputStream - this.chunkOffset = new long[this.chunks.size()]; - initializeChunkOffset(); - this.buffers = null; - this.bufferIndex = 0; - } - - private void initializeChunkOffset() { - int tempOffset = 0; - for (int i = 0; i < chunks.size(); i++) { - chunkOffset[i] = tempOffset; - tempOffset += chunks.get(i).getLen(); - } - } - - @Override - public synchronized int read() - throws IOException { - checkOpen(); - int available = prepareRead(1); - return available == EOF ? EOF : - Byte.toUnsignedInt(buffers.get(bufferIndex).get()); - } - - @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - // According to the JavaDocs for InputStream, it is recommended that - // subclasses provide an override of bulk read if possible for performance - // reasons. In addition to performance, we need to do it for correctness - // reasons. The Ozone REST service uses PipedInputStream and - // PipedOutputStream to relay HTTP response data between a Jersey thread and - // a Netty thread. It turns out that PipedInputStream/PipedOutputStream - // have a subtle dependency (bug?) on the wrapped stream providing separate - // implementations of single-byte read and bulk read. Without this, get key - // responses might close the connection before writing all of the bytes - // advertised in the Content-Length. - if (b == null) { - throw new NullPointerException(); - } - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return 0; - } - checkOpen(); - int total = 0; - while (len > 0) { - int available = prepareRead(len); - if (available == EOF) { - return total != 0 ? total : EOF; - } - buffers.get(bufferIndex).get(b, off + total, available); - len -= available; - total += available; - } - return total; - } - - @Override - public synchronized void close() { - if (xceiverClientManager != null && xceiverClient != null) { - xceiverClientManager.releaseClient(xceiverClient); - xceiverClientManager = null; - xceiverClient = null; - } - } - - /** - * Checks if the stream is open. If not, throws an exception. - * - * @throws IOException if stream is closed - */ - private synchronized void checkOpen() throws IOException { - if (xceiverClient == null) { - throw new IOException("ChunkInputStream has been closed."); - } - } - - /** - * Prepares to read by advancing through chunks and buffers as needed until it - * finds data to return or encounters EOF. - * - * @param len desired length of data to read - * @return length of data available to read, possibly less than desired length - */ - private synchronized int prepareRead(int len) throws IOException { - for (;;) { - if (chunks == null || chunks.isEmpty()) { - // This must be an empty key. - return EOF; - } else if (buffers == null) { - // The first read triggers fetching the first chunk. - readChunkFromContainer(); - } else if (!buffers.isEmpty() && - buffers.get(bufferIndex).hasRemaining()) { - // Data is available from the current buffer. - ByteBuffer bb = buffers.get(bufferIndex); - return len > bb.remaining() ? bb.remaining() : len; - } else if (!buffers.isEmpty() && - !buffers.get(bufferIndex).hasRemaining() && - bufferIndex < buffers.size() - 1) { - // There are additional buffers available. - ++bufferIndex; - } else if (chunkIndex < chunks.size() - 1) { - // There are additional chunks available. - readChunkFromContainer(); - } else { - // All available input has been consumed. - return EOF; - } - } - } - - /** - * Attempts to read the chunk at the specified offset in the chunk list. If - * successful, then the data of the read chunk is saved so that its bytes can - * be returned from subsequent read calls. - * - * @throws IOException if there is an I/O error while performing the call - */ - private synchronized void readChunkFromContainer() throws IOException { - // On every chunk read chunkIndex should be increased so as to read the - // next chunk - chunkIndex += 1; - final ReadChunkResponseProto readChunkResponse; - final ChunkInfo chunkInfo = chunks.get(chunkIndex); - try { - readChunkResponse = ContainerProtocolCalls - .readChunk(xceiverClient, chunkInfo, blockID, traceID); - } catch (IOException e) { - throw new IOException("Unexpected OzoneException: " + e.toString(), e); - } - ByteString byteString = readChunkResponse.getData(); - if (byteString.size() != chunkInfo.getLen()) { - // Bytes read from chunk should be equal to chunk size. - throw new IOException(String - .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", - chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size())); - } - buffers = byteString.asReadOnlyByteBufferList(); - bufferIndex = 0; - } - - @Override - public synchronized void seek(long pos) throws IOException { - if (pos < 0 || (chunks.size() == 0 && pos > 0) - || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1) - .getLen()) { - throw new EOFException("EOF encountered pos: " + pos + " container key: " - + blockID.getLocalID()); - } - if (chunkIndex == -1) { - chunkIndex = Arrays.binarySearch(chunkOffset, pos); - } else if (pos < chunkOffset[chunkIndex]) { - chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos); - } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex) - .getLen()) { - chunkIndex = - Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos); - } - if (chunkIndex < 0) { - // Binary search returns -insertionPoint - 1 if element is not present - // in the array. insertionPoint is the point at which element would be - // inserted in the sorted array. We need to adjust the chunkIndex - // accordingly so that chunkIndex = insertionPoint - 1 - chunkIndex = -chunkIndex -2; - } - // adjust chunkIndex so that readChunkFromContainer reads the correct chunk - chunkIndex -= 1; - readChunkFromContainer(); - adjustBufferIndex(pos); - } - - private void adjustBufferIndex(long pos) { - long tempOffest = chunkOffset[chunkIndex]; - for (int i = 0; i < buffers.size(); i++) { - if (pos - tempOffest >= buffers.get(i).capacity()) { - tempOffest += buffers.get(i).capacity(); - } else { - bufferIndex = i; - break; - } - } - buffers.get(bufferIndex).position((int) (pos - tempOffest)); - } - - @Override - public synchronized long getPos() throws IOException { - return chunkIndex == -1 ? 0 : - chunkOffset[chunkIndex] + buffers.get(bufferIndex).position(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } - - public BlockID getBlockID() { - return blockID; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java deleted file mode 100644 index 10b3bb5..0000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.storage; - -import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; -import org.apache.hadoop.hdds.client.BlockID; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.UUID; - -import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls - .putBlock; -import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls - .writeChunk; - -/** - * An {@link OutputStream} used by the REST service in combination with the - * SCMClient to write the value of a key to a sequence - * of container chunks. Writes are buffered locally and periodically written to - * the container as a new chunk. In order to preserve the semantics that - * replacement of a pre-existing key is atomic, each instance of the stream has - * an internal unique identifier. This unique identifier and a monotonically - * increasing chunk index form a composite key that is used as the chunk name. - * After all data is written, a putKey call creates or updates the corresponding - * container key, and this call includes the full list of chunks that make up - * the key data. The list of chunks is updated all at once. Therefore, a - * concurrent reader never can see an intermediate state in which different - * chunks of data from different versions of the key data are interleaved. - * This class encapsulates all state management for buffering and writing - * through to the container. - */ -public class ChunkOutputStream extends OutputStream { - - private final BlockID blockID; - private final String key; - private final String traceID; - private final BlockData.Builder containerBlockData; - private XceiverClientManager xceiverClientManager; - private XceiverClientSpi xceiverClient; - private ByteBuffer buffer; - private final String streamId; - private int chunkIndex; - private int chunkSize; - - /** - * Creates a new ChunkOutputStream. - * - * @param blockID block ID - * @param key chunk key - * @param xceiverClientManager client manager that controls client - * @param xceiverClient client to perform container calls - * @param traceID container protocol call args - * @param chunkSize chunk size - */ - public ChunkOutputStream(BlockID blockID, String key, - XceiverClientManager xceiverClientManager, - XceiverClientSpi xceiverClient, String traceID, int chunkSize) { - this.blockID = blockID; - this.key = key; - this.traceID = traceID; - this.chunkSize = chunkSize; - KeyValue keyValue = KeyValue.newBuilder() - .setKey("TYPE").setValue("KEY").build(); - this.containerBlockData = BlockData.newBuilder() - .setBlockID(blockID.getDatanodeBlockIDProtobuf()) - .addMetadata(keyValue); - this.xceiverClientManager = xceiverClientManager; - this.xceiverClient = xceiverClient; - this.buffer = ByteBuffer.allocate(chunkSize); - this.streamId = UUID.randomUUID().toString(); - this.chunkIndex = 0; - } - - public ByteBuffer getBuffer() { - return buffer; - } - - @Override - public void write(int b) throws IOException { - checkOpen(); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - buffer.put((byte)b); - if (buffer.position() == chunkSize) { - flushBufferToChunk(rollbackPosition, rollbackLimit); - } - } - - @Override - public void write(byte[] b, int off, int len) - throws IOException { - if (b == null) { - throw new NullPointerException(); - } - if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return; - } - checkOpen(); - while (len > 0) { - int writeLen = Math.min(chunkSize - buffer.position(), len); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - buffer.put(b, off, writeLen); - if (buffer.position() == chunkSize) { - flushBufferToChunk(rollbackPosition, rollbackLimit); - } - off += writeLen; - len -= writeLen; - } - } - - @Override - public void flush() throws IOException { - checkOpen(); - if (buffer.position() > 0) { - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - flushBufferToChunk(rollbackPosition, rollbackLimit); - } - } - - @Override - public void close() throws IOException { - if (xceiverClientManager != null && xceiverClient != null - && buffer != null) { - if (buffer.position() > 0) { - writeChunkToContainer(); - } - try { - putBlock(xceiverClient, containerBlockData.build(), traceID); - } catch (IOException e) { - throw new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); - } finally { - cleanup(); - } - } - } - - public void cleanup() { - xceiverClientManager.releaseClient(xceiverClient); - xceiverClientManager = null; - xceiverClient = null; - buffer = null; - } - - /** - * Checks if the stream is open. If not, throws an exception. - * - * @throws IOException if stream is closed - */ - private void checkOpen() throws IOException { - if (xceiverClient == null) { - throw new IOException("ChunkOutputStream has been closed."); - } - } - - /** - * Attempts to flush buffered writes by writing a new chunk to the container. - * If successful, then clears the buffer to prepare to receive writes for a - * new chunk. - * - * @param rollbackPosition position to restore in buffer if write fails - * @param rollbackLimit limit to restore in buffer if write fails - * @throws IOException if there is an I/O error while performing the call - */ - private void flushBufferToChunk(int rollbackPosition, - int rollbackLimit) throws IOException { - boolean success = false; - try { - writeChunkToContainer(); - success = true; - } finally { - if (success) { - buffer.clear(); - } else { - buffer.position(rollbackPosition); - buffer.limit(rollbackLimit); - } - } - } - - /** - * Writes buffered data as a new chunk to the container and saves chunk - * information to be used later in putKey call. - * - * @throws IOException if there is an I/O error while performing the call - */ - private void writeChunkToContainer() throws IOException { - buffer.flip(); - ByteString data = ByteString.copyFrom(buffer); - ChunkInfo chunk = ChunkInfo - .newBuilder() - .setChunkName( - DigestUtils.md5Hex(key) + "_stream_" - + streamId + "_chunk_" + ++chunkIndex) - .setOffset(0) - .setLen(data.size()) - .build(); - try { - writeChunk(xceiverClient, chunk, blockID, data, traceID); - } catch (IOException e) { - throw new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); - } - containerBlockData.addChunks(chunk); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/package-info.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/package-info.java deleted file mode 100644 index 6e7ce94..0000000 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.storage; - -/** - * Low level IO streams to upload/download chunks from container service. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml deleted file mode 100644 index c7db679..0000000 --- a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml +++ /dev/null @@ -1,28 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<FindBugsFilter> - <Match> - <Package name="org.apache.hadoop.hdds.protocol.proto"/> - </Match> - <Match> - <Package name="org.apache.hadoop.hdds.protocol.datanode.proto"/> - </Match> - <Match> - <Class name="org.apache.hadoop.hdds.cli.GenericCli"></Class> - <Bug pattern="DM_EXIT" /> - </Match> -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml deleted file mode 100644 index 64ebe9b..0000000 --- a/hadoop-hdds/common/pom.xml +++ /dev/null @@ -1,250 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. See accompanying LICENSE file. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 -http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdds</artifactId> - <version>0.3.0-SNAPSHOT</version> - </parent> - <artifactId>hadoop-hdds-common</artifactId> - <version>0.3.0-SNAPSHOT</version> - <description>Apache Hadoop Distributed Data Store Common</description> - <name>Apache Hadoop HDDS Common</name> - <packaging>jar</packaging> - - <properties> - <hdds.version>0.3.0-SNAPSHOT</hdds.version> - <log4j2.version>2.11.0</log4j2.version> - <disruptor.version>3.4.2</disruptor.version> - <declared.hdds.version>${hdds.version}</declared.hdds.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.fusesource.leveldbjni</groupId> - <artifactId>leveldbjni-all</artifactId> - </dependency> - - <dependency> - <artifactId>ratis-server</artifactId> - <groupId>org.apache.ratis</groupId> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-core</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <artifactId>ratis-netty</artifactId> - <groupId>org.apache.ratis</groupId> - </dependency> - <dependency> - <artifactId>ratis-grpc</artifactId> - <groupId>org.apache.ratis</groupId> - </dependency> - <dependency> - <groupId>com.google.errorprone</groupId> - <artifactId>error_prone_annotations</artifactId> - <version>2.2.0</version> - <optional>true</optional> - </dependency> - - <dependency> - <groupId>org.rocksdb</groupId> - <artifactId>rocksdbjni</artifactId> - <version>5.14.2</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api</artifactId> - <version>${log4j2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - <version>${log4j2.version}</version> - </dependency> - <dependency> - <groupId>com.lmax</groupId> - <artifactId>disruptor</artifactId> - <version>${disruptor.version}</version> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-pool2</artifactId> - <version>2.6.0</version> - </dependency> - - </dependencies> - - <build> - <resources> - <resource> - <directory>${basedir}/src/main/resources</directory> - <excludes> - <exclude>hdds-version-info.properties</exclude> - </excludes> - <filtering>false</filtering> - </resource> - <resource> - <directory>${basedir}/src/main/resources</directory> - <includes> - <include>hdds-version-info.properties</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - <extensions> - <extension> - <groupId>kr.motd.maven</groupId> - <artifactId>os-maven-plugin</artifactId> - <version>${os-maven-plugin.version}</version> - </extension> - </extensions> - <plugins> - <plugin> - <groupId>org.xolstice.maven.plugins</groupId> - <artifactId>protobuf-maven-plugin</artifactId> - <version>${protobuf-maven-plugin.version}</version> - <extensions>true</extensions> - <configuration> - <protocArtifact> - com.google.protobuf:protoc:${protobuf-compile.version}:exe:${os.detected.classifier} - </protocArtifact> - <protoSourceRoot>${basedir}/src/main/proto/</protoSourceRoot> - <includes> - <include>DatanodeContainerProtocol.proto</include> - </includes> - <outputDirectory>target/generated-sources/java</outputDirectory> - <clearOutputDirectory>false</clearOutputDirectory> - </configuration> - <executions> - <execution> - <id>compile-protoc</id> - <goals> - <goal>compile</goal> - <goal>test-compile</goal> - <goal>compile-custom</goal> - <goal>test-compile-custom</goal> - </goals> - <configuration> - <pluginId>grpc-java</pluginId> - <pluginArtifact> - io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} - </pluginArtifact> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <phase>generate-sources</phase> - <configuration> - <tasks> - <replace token="com.google.protobuf" value="org.apache.ratis.shaded.com.google.protobuf" - dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto"> - </replace> - <replace token="io.grpc" value="org.apache.ratis.shaded.io.grpc" - dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto"> - </replace> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-maven-plugins</artifactId> - <executions> - <execution> - <id>version-info</id> - <phase>generate-resources</phase> - <goals> - <goal>version-info</goal> - </goals> - <configuration> - <source> - <directory>${basedir}/../</directory> - <includes> - <include>*/src/main/java/**/*.java</include> - <include>*/src/main/proto/*.proto</include> - </includes> - </source> - </configuration> - </execution> - <execution> - <id>compile-protoc</id> - <goals> - <goal>protoc</goal> - </goals> - <configuration> - <protocVersion>${protobuf.version}</protocVersion> - <protocCommand>${protoc.path}</protocCommand> - <imports> - <param> - ${basedir}/../../hadoop-common-project/hadoop-common/src/main/proto - </param> - <param> - ${basedir}/../../hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ - </param> - <param> - ${basedir}/../../hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ - </param> - <param>${basedir}/src/main/proto</param> - </imports> - <source> - <directory>${basedir}/src/main/proto</directory> - <includes> - <include>StorageContainerLocationProtocol.proto</include> - <include>hdds.proto</include> - <include>ScmBlockLocationProtocol.proto</include> - </includes> - </source> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - <configuration> - <excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile> - </configuration> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/common/src/main/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/conf/log4j.properties b/hadoop-hdds/common/src/main/conf/log4j.properties deleted file mode 100644 index 663e254..0000000 --- a/hadoop-hdds/common/src/main/conf/log4j.properties +++ /dev/null @@ -1,157 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Define some default values that can be overridden by system properties -hadoop.root.logger=INFO,console -hadoop.log.dir=. -hadoop.log.file=hadoop.log - -# Define the root logger to the system property "hadoop.root.logger". -log4j.rootLogger=${hadoop.root.logger}, EventCounter - -# Logging Threshold -log4j.threshold=ALL - -# Null Appender -log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender - -# -# Rolling File Appender - cap space usage at 5gb. -# -hadoop.log.maxfilesize=256MB -hadoop.log.maxbackupindex=20 -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file} - -log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize} -log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex} - -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout - -# Pattern format: Date LogLevel LoggerName LogMessage -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n -# Debugging Pattern format -#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - -# -# Daily Rolling File Appender -# - -log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender -log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file} - -# Rollover at midnight -log4j.appender.DRFA.DatePattern=.yyyy-MM-dd - -log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout - -# Pattern format: Date LogLevel LoggerName LogMessage -log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n -# Debugging Pattern format -#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - -# -# console -# Add "console" to rootlogger above if you want to use this -# - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n - -# -# TaskLog Appender -# -log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender - -log4j.appender.TLA.layout=org.apache.log4j.PatternLayout -log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n - -# -# HDFS block state change log from block manager -# -# Uncomment the following to log normal block state change -# messages from BlockManager in NameNode. -#log4j.logger.BlockStateChange=DEBUG - -# -#Security appender -# -hadoop.security.logger=INFO,NullAppender -hadoop.security.log.maxfilesize=256MB -hadoop.security.log.maxbackupindex=20 -log4j.category.SecurityLogger=${hadoop.security.logger} -hadoop.security.log.file=SecurityAuth-${user.name}.audit -log4j.appender.RFAS=org.apache.log4j.RollingFileAppender -log4j.appender.RFAS.File=${hadoop.log.dir}/${hadoop.security.log.file} -log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout -log4j.appender.RFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n -log4j.appender.RFAS.MaxFileSize=${hadoop.security.log.maxfilesize} -log4j.appender.RFAS.MaxBackupIndex=${hadoop.security.log.maxbackupindex} - -# -# Daily Rolling Security appender -# -log4j.appender.DRFAS=org.apache.log4j.DailyRollingFileAppender -log4j.appender.DRFAS.File=${hadoop.log.dir}/${hadoop.security.log.file} -log4j.appender.DRFAS.layout=org.apache.log4j.PatternLayout -log4j.appender.DRFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n -log4j.appender.DRFAS.DatePattern=.yyyy-MM-dd - - -# Custom Logging levels -# AWS SDK & S3A FileSystem -#log4j.logger.com.amazonaws=ERROR -log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR -#log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN - -# -# Event Counter Appender -# Sends counts of logging messages at different severity levels to Hadoop Metrics. -# -log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter - - -log4j.logger.org.apache.hadoop.ozone=DEBUG,OZONE,FILE - -# Do not log into datanode logs. Remove this line to have single log. -log4j.additivity.org.apache.hadoop.ozone=false - -# For development purposes, log both to console and log file. -log4j.appender.OZONE=org.apache.log4j.ConsoleAppender -log4j.appender.OZONE.Threshold=info -log4j.appender.OZONE.layout=org.apache.log4j.PatternLayout -log4j.appender.OZONE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p \ - %X{component} %X{function} %X{resource} %X{user} %X{request} - %m%n - -# Real ozone logger that writes to ozone.log -log4j.appender.FILE=org.apache.log4j.DailyRollingFileAppender -log4j.appender.FILE.File=${hadoop.log.dir}/ozone.log -log4j.appender.FILE.Threshold=debug -log4j.appender.FILE.layout=org.apache.log4j.PatternLayout -log4j.appender.FILE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p \ -(%F:%L) %X{function} %X{resource} %X{user} %X{request} - \ -%m%n - -# Log levels of third-party libraries -log4j.logger.org.apache.commons.beanutils=WARN - -log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR -log4j.logger.org.apache.ratis.conf.ConfUtils=WARN -log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=ERROR http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java deleted file mode 100644 index 856d113..0000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds; - -import org.apache.hadoop.utils.db.DBProfile; - -/** - * This class contains constants for configuration keys and default values - * used in hdds. - */ -public final class HddsConfigKeys { - - /** - * Do not instantiate. - */ - private HddsConfigKeys() { - } - - public static final String HDDS_HEARTBEAT_INTERVAL = - "hdds.heartbeat.interval"; - public static final String HDDS_HEARTBEAT_INTERVAL_DEFAULT = - "30s"; - - public static final String HDDS_NODE_REPORT_INTERVAL = - "hdds.node.report.interval"; - public static final String HDDS_NODE_REPORT_INTERVAL_DEFAULT = - "60s"; - - public static final String HDDS_CONTAINER_REPORT_INTERVAL = - "hdds.container.report.interval"; - public static final String HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT = - "60s"; - - public static final String HDDS_PIPELINE_REPORT_INTERVAL = - "hdds.pipeline.report.interval"; - public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT = - "60s"; - - public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL = - "hdds.command.status.report.interval"; - public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT = - "60s"; - - public static final String HDDS_CONTAINER_ACTION_MAX_LIMIT = - "hdds.container.action.max.limit"; - public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT = - 20; - - public static final String HDDS_PIPELINE_ACTION_MAX_LIMIT = - "hdds.pipeline.action.max.limit"; - public static final int HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT = - 20; - - // Configuration to allow volume choosing policy. - public static final String HDDS_DATANODE_VOLUME_CHOOSING_POLICY = - "hdds.datanode.volume.choosing.policy"; - - // DB Profiles used by ROCKDB instances. - public static final String HDDS_DB_PROFILE = "hdds.db.profile"; - public static final DBProfile HDDS_DEFAULT_DB_PROFILE = DBProfile.SSD; - - // Once a container usage crosses this threshold, it is eligible for - // closing. - public static final String HDDS_CONTAINER_CLOSE_THRESHOLD = - "hdds.container.close.threshold"; - public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f; - - public static final String HDDS_SCM_CHILLMODE_ENABLED = - "hdds.scm.chillmode.enabled"; - public static final boolean HDDS_SCM_CHILLMODE_ENABLED_DEFAULT = true; - - // % of containers which should have at least one reported replica - // before SCM comes out of chill mode. - public static final String HDDS_SCM_CHILLMODE_THRESHOLD_PCT = - "hdds.scm.chillmode.threshold.pct"; - public static final double HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT = 0.99; - - public static final String HDDS_LOCK_MAX_CONCURRENCY = - "hdds.lock.max.concurrency"; - public static final int HDDS_LOCK_MAX_CONCURRENCY_DEFAULT = 100; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsIdFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsIdFactory.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsIdFactory.java deleted file mode 100644 index b244b8c..0000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsIdFactory.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds; - -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; - -/** - * HDDS Id generator. - */ -public final class HddsIdFactory { - private HddsIdFactory() { - } - - private static final AtomicLong LONG_COUNTER = new AtomicLong( - System.currentTimeMillis()); - - /** - * Returns an incrementing long. This class doesn't - * persist initial value for long Id's, so incremental id's after restart - * may collide with previously generated Id's. - * - * @return long - */ - public static long getLongId() { - return LONG_COUNTER.incrementAndGet(); - } - - /** - * Returns a uuid. - * - * @return UUID. - */ - public static UUID getUUId() { - return UUID.randomUUID(); - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
