This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new d14c56d HDDS-916. MultipartUpload: Complete Multipart upload request.
Contributed by Bharat Viswanadham.
d14c56d is described below
commit d14c56d1509da1cf69d92f2f9810506863669e54
Author: Márton Elek <[email protected]>
AuthorDate: Mon Jan 7 10:42:04 2019 +0100
HDDS-916. MultipartUpload: Complete Multipart upload request. Contributed
by Bharat Viswanadham.
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 4 +
.../apache/hadoop/ozone/client/OzoneBucket.java | 26 ++
.../ozone/client/protocol/ClientProtocol.java | 27 +++
.../hadoop/ozone/client/rest/RestClient.java | 10 +
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 27 +++
.../org/apache/hadoop/ozone/audit/OMAction.java | 3 +-
.../ozone/om/helpers/OmMultipartKeyInfo.java | 7 +-
.../om/helpers/OmMultipartUploadCompleteInfo.java | 70 ++++++
.../ozone/om/helpers/OmMultipartUploadList.java | 63 +++++
.../ozone/om/protocol/OzoneManagerProtocol.java | 21 +-
...OzoneManagerProtocolClientSideTranslatorPB.java | 53 +++-
.../src/main/proto/OzoneManagerProtocol.proto | 25 ++
.../ozone/client/rpc/TestOzoneRpcClient.java | 270 +++++++++++++++++++++
.../org/apache/hadoop/ozone/om/KeyManager.java | 20 +-
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 157 ++++++++++++
.../java/org/apache/hadoop/ozone/om/OMMetrics.java | 11 +
.../org/apache/hadoop/ozone/om/OzoneManager.java | 27 +++
.../hadoop/ozone/om/exceptions/OMException.java | 4 +
...OzoneManagerProtocolServerSideTranslatorPB.java | 66 ++++-
19 files changed, 883 insertions(+), 8 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 957aab0..118cf24 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -237,10 +237,14 @@ public final class OzoneConsts {
public static final String REPLICATION_TYPE = "replicationType";
public static final String REPLICATION_FACTOR = "replicationFactor";
public static final String KEY_LOCATION_INFO = "keyLocationInfo";
+ public static final String MULTIPART_LIST = "multipartList";
// For OM metrics saving to a file
public static final String OM_METRICS_FILE = "omMetrics";
public static final String OM_METRICS_TEMP_FILE = OM_METRICS_FILE + ".tmp";
+ // For Multipart upload
+ public static final int OM_MULTIPART_MIN_SIZE = 5 * 1024 * 1024;
+
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 3a681d7..e6a6e12 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -31,10 +31,12 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
/**
@@ -353,6 +355,15 @@ public class OzoneBucket {
defaultReplication);
}
+ /**
+ * Create a part key for a multipart upload key.
+ * @param key
+ * @param size
+ * @param partNumber
+ * @param uploadID
+ * @return OzoneOutputStream
+ * @throws IOException
+ */
public OzoneOutputStream createMultipartKey(String key, long size,
int partNumber, String uploadID)
throws IOException {
@@ -361,6 +372,21 @@ public class OzoneBucket {
}
/**
+ * Complete Multipart upload. This will combine all the parts and make the
+ * key visible in ozone.
+ * @param key
+ * @param uploadID
+ * @param partsMap
+ * @return OmMultipartUploadCompleteInfo
+ * @throws IOException
+ */
+ public OmMultipartUploadCompleteInfo completeMultipartUpload(String key,
+ String uploadID, Map<Integer, String> partsMap) throws IOException {
+ return proxy.completeMultipartUpload(volumeName, name, key, uploadID,
+ partsMap);
+ }
+
+ /**
* An Iterator to iterate over {@link OzoneKey} list.
*/
private class KeyIterator implements Iterator<OzoneKey> {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 02dc530..5960943 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -27,9 +27,11 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
/**
* An implementer of this interface is capable of connecting to Ozone Cluster
@@ -401,10 +403,35 @@ public interface ClientProtocol {
bucketName, String keyName, ReplicationType type, ReplicationFactor
factor) throws IOException;
+ /**
+ * Create a part key for a multipart upload key.
+ * @param volumeName
+ * @param bucketName
+ * @param keyName
+ * @param size
+ * @param partNumber
+ * @param uploadID
+ * @return OzoneOutputStream
+ * @throws IOException
+ */
OzoneOutputStream createMultipartKey(String volumeName, String bucketName,
String keyName, long size,
int partNumber, String uploadID)
throws IOException;
+ /**
+ * Complete Multipart upload. This will combine all the parts and make the
+ * key visible in ozone.
+ * @param volumeName
+ * @param bucketName
+ * @param keyName
+ * @param uploadID
+ * @param partsMap
+ * @return OmMultipartUploadCompleteInfo
+ * @throws IOException
+ */
+ OmMultipartUploadCompleteInfo completeMultipartUpload(String volumeName,
+ String bucketName, String keyName, String uploadID,
+ Map<Integer, String> partsMap) throws IOException;
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
index b40d5e1..85e559f 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
@@ -43,6 +43,7 @@ import
org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
import org.apache.hadoop.ozone.web.response.ListBuckets;
@@ -79,6 +80,7 @@ import java.net.URISyntaxException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
@@ -975,4 +977,12 @@ public class RestClient implements ClientProtocol {
throw new UnsupportedOperationException("Ozone REST protocol does not " +
"support this operation.");
}
+
+ @Override
+ public OmMultipartUploadCompleteInfo completeMultipartUpload(
+ String volumeName, String bucketName, String keyName, String uploadID,
+ Map<Integer, String> partsMap) throws IOException {
+ throw new UnsupportedOperationException("Ozone REST protocol does not " +
+ "support this operation.");
+ }
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 1743372..72da5f8 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -50,6 +50,8 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@@ -753,4 +755,29 @@ public class RpcClient implements ClientProtocol {
return new OzoneOutputStream(groupOutputStream);
}
+ @Override
+ public OmMultipartUploadCompleteInfo completeMultipartUpload(
+ String volumeName, String bucketName, String keyName, String uploadID,
+ Map<Integer, String> partsMap) throws IOException {
+ HddsClientUtils.verifyResourceName(volumeName, bucketName);
+ HddsClientUtils.checkNotNull(keyName, uploadID);
+
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setMultipartUploadID(uploadID)
+ .build();
+
+ OmMultipartUploadList omMultipartUploadList = new OmMultipartUploadList(
+ partsMap);
+
+ OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo =
+ ozoneManagerClient.completeMultipartUpload(keyArgs,
+ omMultipartUploadList);
+
+ return omMultipartUploadCompleteInfo;
+
+ }
+
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index a65f6bc..d3b2522 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -48,7 +48,8 @@ public enum OMAction implements AuditAction {
READ_KEY,
LIST_S3BUCKETS,
INITIATE_MULTIPART_UPLOAD,
- COMMIT_MULTIPART_UPLOAD_PARTKEY;
+ COMMIT_MULTIPART_UPLOAD_PARTKEY,
+ COMPLETE_MULTIPART_UPLOAD;
@Override
public String getAction() {
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
index 152091d..2c976fb 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
@@ -23,6 +23,7 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
import java.util.HashMap;
import java.util.Map;
+import java.util.TreeMap;
/**
* This class represents multipart upload information for a key, which holds
@@ -30,7 +31,7 @@ import java.util.Map;
*/
public class OmMultipartKeyInfo {
private String uploadID;
- private Map<Integer, PartKeyInfo> partKeyInfoList;
+ private TreeMap<Integer, PartKeyInfo> partKeyInfoList;
/**
* Construct OmMultipartKeyInfo object which holds multipart upload
@@ -40,7 +41,7 @@ public class OmMultipartKeyInfo {
*/
public OmMultipartKeyInfo(String id, Map<Integer, PartKeyInfo> list) {
this.uploadID = id;
- this.partKeyInfoList = new HashMap<>(list);
+ this.partKeyInfoList = new TreeMap<>(list);
}
/**
@@ -51,7 +52,7 @@ public class OmMultipartKeyInfo {
return uploadID;
}
- public Map<Integer, PartKeyInfo> getPartKeyInfoList() {
+ public TreeMap<Integer, PartKeyInfo> getPartKeyInfoList() {
return partKeyInfoList;
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java
new file mode 100644
index 0000000..71ce882
--- /dev/null
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+/**
+ * This class holds information about the response of complete Multipart
+ * upload request.
+ */
+public class OmMultipartUploadCompleteInfo {
+
+ private String volume;
+ private String bucket;
+ private String key;
+ private String hash; // this is used as ETag for S3.
+
+ public OmMultipartUploadCompleteInfo(String volumeName, String bucketName,
+ String keyName, String md5) {
+ this.volume = volumeName;
+ this.bucket = bucketName;
+ this.key = keyName;
+ this.hash = md5;
+ }
+
+ public String getVolume() {
+ return volume;
+ }
+
+ public void setVolume(String volume) {
+ this.volume = volume;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public void setBucket(String bucket) {
+ this.bucket = bucket;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public String getHash() {
+ return hash;
+ }
+
+ public void setHash(String hash) {
+ this.hash = hash;
+ }
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java
new file mode 100644
index 0000000..99cd5ad
--- /dev/null
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * This class represents multipart list, which is required for
+ * CompleteMultipart upload request.
+ */
+public class OmMultipartUploadList {
+
+ private final TreeMap<Integer, String> multipartMap;
+
+ /**
+ * Construct OmMultipartUploadList which holds multipart map which contains
+ * part number and part name.
+ * @param partMap
+ */
+ public OmMultipartUploadList(Map<Integer, String> partMap) {
+ this.multipartMap = new TreeMap<>(partMap);
+ }
+
+ /**
+ * Return multipartMap which is a map of part number and part name.
+ * @return multipartMap
+ */
+ public TreeMap<Integer, String> getMultipartMap() {
+ return multipartMap;
+ }
+
+ /**
+ * Construct Part list from the multipartMap.
+ * @return List<Part>
+ */
+ public List<Part> getPartsList() {
+ List<Part> partList = new ArrayList<>();
+ multipartMap.forEach((partNumber, partName) -> partList.add(Part
+
.newBuilder().setPartName(partName).setPartNumber(partNumber).build()));
+ return partList;
+ }
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index ee1ff6f..09da1ab 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@@ -315,9 +317,26 @@ public interface OzoneManagerProtocol {
OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
IOException;
-
+ /**
+ * Commit Multipart upload part file.
+ * @param omKeyArgs
+ * @param clientID
+ * @return OmMultipartCommitUploadPartInfo
+ * @throws IOException
+ */
OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
OmKeyArgs omKeyArgs, long clientID) throws IOException;
+ /**
+ * Complete Multipart upload Request.
+ * @param omKeyArgs
+ * @param multipartUploadList
+ * @return OmMultipartUploadCompleteInfo
+ * @throws IOException
+ */
+ OmMultipartUploadCompleteInfo completeMultipartUpload(
+ OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList)
+ throws IOException;
+
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 2059c83..220e839 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@@ -88,6 +90,10 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .MultipartUploadCompleteRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .MultipartUploadCompleteResponse;
import org.apache.hadoop.ozone.protocol.proto
.OzoneManagerProtocolProtos.RenameKeyRequest;
import org.apache.hadoop.ozone.protocol.proto
@@ -970,6 +976,11 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
@Override
public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
OmKeyArgs omKeyArgs, long clientId) throws IOException {
+
+ List<OmKeyLocationInfo> locationInfoList = omKeyArgs.getLocationInfoList();
+ Preconditions.checkNotNull(locationInfoList);
+
+
MultipartCommitUploadPartRequest.Builder multipartCommitUploadPartRequest
= MultipartCommitUploadPartRequest.newBuilder();
@@ -979,7 +990,11 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
.setKeyName(omKeyArgs.getKeyName())
.setMultipartUploadID(omKeyArgs.getMultipartUploadID())
.setIsMultipartKey(omKeyArgs.getIsMultipartKey())
- .setMultipartNumber(omKeyArgs.getMultipartUploadPartNumber());
+ .setMultipartNumber(omKeyArgs.getMultipartUploadPartNumber())
+ .setDataSize(omKeyArgs.getDataSize())
+ .addAllKeyLocations(
+ locationInfoList.stream().map(OmKeyLocationInfo::getProtobuf)
+ .collect(Collectors.toList()));
multipartCommitUploadPartRequest.setClientID(clientId);
multipartCommitUploadPartRequest.setKeyArgs(keyArgs.build());
@@ -1002,6 +1017,42 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
return info;
}
+ @Override
+ public OmMultipartUploadCompleteInfo completeMultipartUpload(
+ OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList)
+ throws IOException {
+ MultipartUploadCompleteRequest.Builder multipartUploadCompleteRequest =
+ MultipartUploadCompleteRequest.newBuilder();
+
+ KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
+ .setVolumeName(omKeyArgs.getVolumeName())
+ .setBucketName(omKeyArgs.getBucketName())
+ .setKeyName(omKeyArgs.getKeyName())
+ .setMultipartUploadID(omKeyArgs.getMultipartUploadID());
+
+ multipartUploadCompleteRequest.setKeyArgs(keyArgs.build());
+ multipartUploadCompleteRequest.addAllPartsList(multipartUploadList
+ .getPartsList());
+
+ OMRequest omRequest = createOMRequest(
+ Type.CompleteMultiPartUpload)
+ .setCompleteMultiPartUploadRequest(
+ multipartUploadCompleteRequest.build()).build();
+
+ MultipartUploadCompleteResponse response = submitRequest(omRequest)
+ .getCompleteMultiPartUploadResponse();
+
+ if (response.getStatus() != Status.OK) {
+ throw new IOException("Complete multipart upload failed, error:" +
+ response.getStatus());
+ }
+
+ OmMultipartUploadCompleteInfo info = new
+ OmMultipartUploadCompleteInfo(response.getVolume(), response
+ .getBucket(), response.getKey(), response.getHash());
+ return info;
+ }
+
public List<ServiceInfo> getServiceList() throws IOException {
ServiceListRequest req = ServiceListRequest.newBuilder().build();
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 93c86f1..b32d324 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -65,6 +65,7 @@ enum Type {
ListS3Buckets = 44;
InitiateMultiPartUpload = 45;
CommitMultiPartUpload = 46;
+ CompleteMultiPartUpload = 47;
ServiceList = 51;
}
@@ -105,6 +106,7 @@ message OMRequest {
optional S3ListBucketsRequest listS3BucketsRequest =
44;
optional MultipartInfoInitiateRequest initiateMultiPartUploadRequest =
45;
optional MultipartCommitUploadPartRequest commitMultiPartUploadRequest =
46;
+ optional MultipartUploadCompleteRequest completeMultiPartUploadRequest =
47;
optional ServiceListRequest serviceListRequest =
51;
}
@@ -146,6 +148,7 @@ message OMResponse {
optional S3ListBucketsResponse listS3BucketsResponse =
44;
optional MultipartInfoInitiateResponse initiateMultiPartUploadResponse =
45;
optional MultipartCommitUploadPartResponse commitMultiPartUploadResponse =
46;
+ optional MultipartUploadCompleteResponse completeMultiPartUploadResponse =
47;
optional ServiceListResponse ServiceListResponse =
51;
}
@@ -177,6 +180,10 @@ enum Status {
INITIATE_MULTIPART_UPLOAD_ERROR = 24;
MULTIPART_UPLOAD_PARTFILE_ERROR = 25;
NO_SUCH_MULTIPART_UPLOAD_ERROR = 26;
+ MISMATCH_MULTIPART_LIST = 27;
+ MISSING_UPLOAD_PARTS = 28;
+ COMPLETE_MULTIPART_UPLOAD_ERROR = 29;
+ ENTITY_TOO_SMALL = 30;
}
@@ -583,6 +590,24 @@ message MultipartCommitUploadPartResponse {
required Status status = 2;
}
+message MultipartUploadCompleteRequest {
+ required KeyArgs keyArgs = 1;
+ repeated Part partsList = 2;
+}
+
+message MultipartUploadCompleteResponse {
+ optional string volume = 1;
+ optional string bucket = 2;
+ optional string key = 3;
+ optional string hash = 4; // This will be used as etag for s3
+ required Status status = 5;
+}
+
+message Part {
+ required uint32 partNumber = 1;
+ required string partName = 2;
+}
+
/**
The OM service that takes care of Ozone namespace.
*/
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 90c3c1f..f82da16 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -69,8 +69,11 @@ import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.UUID;
import static org.hamcrest.CoreMatchers.containsString;
@@ -1497,7 +1500,274 @@ public class TestOzoneRpcClient {
GenericTestUtils.assertExceptionContains("NO_SUCH_MULTIPART_UPLOAD_ERROR",
ex);
}
+ }
+
+ @Test
+ public void testMultipartUpload() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ doMultipartUpload(bucket, keyName, (byte)98);
+
+ }
+
+
+ @Test
+ public void testMultipartUploadOverride() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ doMultipartUpload(bucket, keyName, (byte)96);
+
+ // Initiate Multipart upload again, now we should read latest version, as
+ // read always reads latest blocks.
+ doMultipartUpload(bucket, keyName, (byte)97);
+
+ }
+
+
+ @Test
+ public void testMultipartUploadWithPartsLessThanMinSize() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ // Initiate multipart upload
+ String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+ .STAND_ALONE, ReplicationFactor.ONE);
+
+ // Upload Parts
+ Map<Integer, String> partsMap = new TreeMap<>();
+ // Uploading part 1 with less than min size
+ String partName = uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(
+ "UTF-8"));
+ partsMap.put(1, partName);
+
+ partName = uploadPart(bucket, keyName, uploadID, 2, "data".getBytes(
+ "UTF-8"));
+ partsMap.put(2, partName);
+
+
+ // Complete multipart upload
+
+ try {
+ completeMultipartUpload(bucket, keyName, uploadID, partsMap);
+ fail("testMultipartUploadWithPartsLessThanMinSize failed");
+ } catch (IOException ex) {
+ GenericTestUtils.assertExceptionContains("ENTITY_TOO_SMALL", ex);
+ }
+
+ }
+
+
+
+ @Test
+ public void testMultipartUploadWithPartsMisMatchWithListSizeDifferent()
+ throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+ .STAND_ALONE, ReplicationFactor.ONE);
+
+ // We have not uploaded any parts, but passing some list it should throw
+ // error.
+ TreeMap<Integer, String> partsMap = new TreeMap<>();
+ partsMap.put(1, UUID.randomUUID().toString());
+
+ try {
+ completeMultipartUpload(bucket, keyName, uploadID, partsMap);
+ fail("testMultipartUploadWithPartsMisMatch");
+ } catch (IOException ex) {
+ GenericTestUtils.assertExceptionContains("MISMATCH_MULTIPART_LIST", ex);
+ }
+
+ }
+
+ @Test
+ public void testMultipartUploadWithPartsMisMatchWithIncorrectPartName()
+ throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+ .STAND_ALONE, ReplicationFactor.ONE);
+
+ uploadPart(bucket, keyName, uploadID, 1, "data".getBytes("UTF-8"));
+ // We have not uploaded any parts, but passing some list it should throw
+ // error.
+ TreeMap<Integer, String> partsMap = new TreeMap<>();
+ partsMap.put(1, UUID.randomUUID().toString());
+
+ try {
+ completeMultipartUpload(bucket, keyName, uploadID, partsMap);
+ fail("testMultipartUploadWithPartsMisMatch");
+ } catch (IOException ex) {
+ GenericTestUtils.assertExceptionContains("MISMATCH_MULTIPART_LIST", ex);
+ }
+
+ }
+
+ @Test
+ public void testMultipartUploadWithMissingParts() throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ String keyName = UUID.randomUUID().toString();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+ .STAND_ALONE, ReplicationFactor.ONE);
+
+ uploadPart(bucket, keyName, uploadID, 1, "data".getBytes("UTF-8"));
+ // We have not uploaded any parts, but passing some list it should throw
+ // error.
+ TreeMap<Integer, String> partsMap = new TreeMap<>();
+ partsMap.put(3, "random");
+
+ try {
+ completeMultipartUpload(bucket, keyName, uploadID, partsMap);
+ fail("testMultipartUploadWithPartsMisMatch");
+ } catch (IOException ex) {
+ GenericTestUtils.assertExceptionContains("MISSING_UPLOAD_PARTS", ex);
+ }
+ }
+
+
+ private byte[] generateData(int size, byte val) {
+ byte[] chars = new byte[size];
+ Arrays.fill(chars, val);
+ return chars;
+ }
+
+
+ private void doMultipartUpload(OzoneBucket bucket, String keyName, byte val)
+ throws Exception {
+ // Initiate Multipart upload request
+ String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+ .RATIS, ReplicationFactor.THREE);
+
+ // Upload parts
+ Map<Integer, String> partsMap = new TreeMap<>();
+
+ // get 5mb data, as each part should be of min 5mb, last part can be less
+ // than 5mb
+ int length = 0;
+ byte[] data = generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, val);
+ String partName = uploadPart(bucket, keyName, uploadID, 1, data);
+ partsMap.put(1, partName);
+ length += data.length;
+
+
+ partName = uploadPart(bucket, keyName, uploadID, 2, data);
+ partsMap.put(2, partName);
+ length += data.length;
+
+ String part3 = UUID.randomUUID().toString();
+ partName = uploadPart(bucket, keyName, uploadID, 3, part3.getBytes(
+ "UTF-8"));
+ partsMap.put(3, partName);
+ length += part3.getBytes("UTF-8").length;
+
+
+ // Complete multipart upload request
+ completeMultipartUpload(bucket, keyName, uploadID, partsMap);
+
+
+ //Now Read the key which has been completed multipart upload.
+ byte[] fileContent = new byte[data.length + data.length + part3.getBytes(
+ "UTF-8").length];
+ OzoneInputStream inputStream = bucket.readKey(keyName);
+ inputStream.read(fileContent);
+
+ Assert.assertTrue(verifyRatisReplication(bucket.getVolumeName(),
+ bucket.getName(), keyName, ReplicationType.RATIS,
+ ReplicationFactor.THREE));
+
+ StringBuilder sb = new StringBuilder(length);
+
+ // Combine all parts data, and check is it matching with get key data.
+ String part1 = new String(data);
+ String part2 = new String(data);
+ sb.append(part1);
+ sb.append(part2);
+ sb.append(part3);
+ Assert.assertEquals(sb.toString(), new String(fileContent));
+ }
+
+
+ private String initiateMultipartUpload(OzoneBucket bucket, String keyName,
+ ReplicationType replicationType, ReplicationFactor replicationFactor)
+ throws Exception {
+ OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
+ replicationType, replicationFactor);
+
+ String uploadID = multipartInfo.getUploadID();
+ Assert.assertNotNull(uploadID);
+ return uploadID;
+ }
+
+ private String uploadPart(OzoneBucket bucket, String keyName, String
+ uploadID, int partNumber, byte[] data) throws Exception {
+ OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
+ data.length, partNumber, uploadID);
+ ozoneOutputStream.write(data, 0,
+ data.length);
+ ozoneOutputStream.close();
+
+ OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
+ ozoneOutputStream.getCommitUploadPartInfo();
+
+ Assert.assertNotNull(omMultipartCommitUploadPartInfo);
+ Assert.assertNotNull(omMultipartCommitUploadPartInfo.getPartName());
+ return omMultipartCommitUploadPartInfo.getPartName();
+
+ }
+ private void completeMultipartUpload(OzoneBucket bucket, String keyName,
+ String uploadID, Map<Integer, String> partsMap) throws Exception {
+ OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = bucket
+ .completeMultipartUpload(keyName, uploadID, partsMap);
+
+ Assert.assertNotNull(omMultipartUploadCompleteInfo);
+ Assert.assertEquals(omMultipartUploadCompleteInfo.getBucket(), bucket
+ .getName());
+ Assert.assertEquals(omMultipartUploadCompleteInfo.getVolume(), bucket
+ .getVolumeName());
+ Assert.assertEquals(omMultipartUploadCompleteInfo.getKey(), keyName);
+ Assert.assertNotNull(omMultipartUploadCompleteInfo.getHash());
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index f5f3f1b..29e0c60 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.utils.BackgroundService;
@@ -191,8 +193,24 @@ public interface KeyManager {
*/
OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
IOException;
+ /**
+ * Commit Multipart upload part file.
+ * @param omKeyArgs
+ * @param clientID
+ * @return OmMultipartCommitUploadPartInfo
+ * @throws IOException
+ */
OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
- OmKeyArgs keyArgs, long clientID) throws IOException;
+ OmKeyArgs omKeyArgs, long clientID) throws IOException;
+ /**
+ * Complete Multipart upload Request.
+ * @param omKeyArgs
+ * @param multipartUploadList
+ * @return OmMultipartUploadCompleteInfo
+ * @throws IOException
+ */
+ OmMultipartUploadCompleteInfo completeMultipartUpload(OmKeyArgs omKeyArgs,
+ OmMultipartUploadList multipartUploadList) throws IOException;
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 614d453..1347e1c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -22,9 +22,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -43,6 +45,8 @@ import
org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.PartKeyInfo;
@@ -62,6 +66,7 @@ import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MA
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -677,6 +682,10 @@ public class KeyManagerImpl implements KeyManager {
OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(
openKey);
+ // set the data size and location info list
+ keyInfo.setDataSize(omKeyArgs.getDataSize());
+ keyInfo.updateLocationInfoList(omKeyArgs.getLocationInfoList());
+
partName = keyName + clientID;
if (multipartKeyInfo == null) {
throw new OMException("No such Multipart upload is with specified " +
@@ -729,4 +738,152 @@ public class KeyManagerImpl implements KeyManager {
}
+ @Override
+ public OmMultipartUploadCompleteInfo completeMultipartUpload(
+ OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList)
+ throws IOException {
+ Preconditions.checkNotNull(omKeyArgs);
+ Preconditions.checkNotNull(multipartUploadList);
+ String volumeName = omKeyArgs.getVolumeName();
+ String bucketName = omKeyArgs.getBucketName();
+ String keyName = omKeyArgs.getKeyName();
+ String uploadID = omKeyArgs.getMultipartUploadID();
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+ try {
+ String multipartKey = metadataManager.getMultipartKey(volumeName,
+ bucketName, keyName, uploadID);
+ String ozoneKey = metadataManager.getOzoneKey(volumeName, bucketName,
+ keyName);
+ OmKeyInfo keyInfo = metadataManager.getKeyTable().get(ozoneKey);
+
+ OmMultipartKeyInfo multipartKeyInfo = metadataManager
+ .getMultipartInfoTable().get(multipartKey);
+ if (multipartKeyInfo == null) {
+ throw new OMException("Complete Multipart Upload Failed: volume: " +
+ volumeName + "bucket: " + bucketName + "key: " + keyName,
+ ResultCodes.NO_SUCH_MULTIPART_UPLOAD);
+ }
+ TreeMap<Integer, PartKeyInfo> partKeyInfoMap = multipartKeyInfo
+ .getPartKeyInfoList();
+
+ TreeMap<Integer, String> multipartMap = multipartUploadList
+ .getMultipartMap();
+
+ // Last key in the map should be having key value as size, as map's
+ // are sorted. Last entry in both maps should have partNumber as size
+ // of the map. As we have part entries 1, 2, 3, 4 and then we get
+ // complete multipart upload request so the map last entry should have 4,
+ // if it is having value greater or less than map size, then there is
+ // some thing wrong throw error.
+
+ Map.Entry<Integer, String> multipartMapLastEntry = multipartMap
+ .lastEntry();
+ Map.Entry<Integer, PartKeyInfo> partKeyInfoLastEntry = partKeyInfoMap
+ .lastEntry();
+ if (partKeyInfoMap.size() != multipartMap.size()) {
+ throw new OMException("Complete Multipart Upload Failed: volume: " +
+ volumeName + "bucket: " + bucketName + "key: " + keyName,
+ ResultCodes.MISMATCH_MULTIPART_LIST);
+ }
+
+ // Last entry part Number should be the size of the map, otherwise this
+ // means we have missing some parts but we got a complete request.
+ if (multipartMapLastEntry.getKey() != partKeyInfoMap.size() ||
+ partKeyInfoLastEntry.getKey() != partKeyInfoMap.size()) {
+ throw new OMException("Complete Multipart Upload Failed: volume: " +
+ volumeName + "bucket: " + bucketName + "key: " + keyName,
+ ResultCodes.MISSING_UPLOAD_PARTS);
+ }
+ ReplicationType type = partKeyInfoLastEntry.getValue().getPartKeyInfo()
+ .getType();
+ ReplicationFactor factor = partKeyInfoLastEntry.getValue()
+ .getPartKeyInfo().getFactor();
+ List<OmKeyLocationInfo> locations = new ArrayList<>();
+ long size = 0;
+ int partsCount =1;
+ int partsMapSize = partKeyInfoMap.size();
+ for(Map.Entry<Integer, PartKeyInfo> partKeyInfoEntry : partKeyInfoMap
+ .entrySet()) {
+ int partNumber = partKeyInfoEntry.getKey();
+ PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue();
+ // Check we have all parts to complete multipart upload and also
+ // check partNames provided match with actual part names
+ String providedPartName = multipartMap.get(partNumber);
+ String actualPartName = partKeyInfo.getPartName();
+ if (partNumber == partsCount) {
+ if (!actualPartName.equals(providedPartName)) {
+ throw new OMException("Complete Multipart Upload Failed: volume: "
+
+ volumeName + "bucket: " + bucketName + "key: " + keyName,
+ ResultCodes.MISMATCH_MULTIPART_LIST);
+ }
+ OmKeyInfo currentPartKeyInfo = OmKeyInfo
+ .getFromProtobuf(partKeyInfo.getPartKeyInfo());
+ // Check if any part size is less than 5mb, last part can be less
+ // than 5 mb.
+ if (partsCount != partsMapSize &&
+ currentPartKeyInfo.getDataSize() < OM_MULTIPART_MIN_SIZE) {
+ throw new OMException("Complete Multipart Upload Failed: Entity " +
+ "too small: volume: " + volumeName + "bucket: " + bucketName
+ + "key: " + keyName, ResultCodes.ENTITY_TOO_SMALL);
+ }
+ // As all part keys will have only one version.
+ OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo
+ .getKeyLocationVersions().get(0);
+ locations.addAll(currentKeyInfoGroup.getLocationList());
+ size += currentPartKeyInfo.getDataSize();
+ } else {
+ throw new OMException("Complete Multipart Upload Failed: volume: " +
+ volumeName + "bucket: " + bucketName + "key: " + keyName,
+ ResultCodes.MISSING_UPLOAD_PARTS);
+ }
+ partsCount++;
+ }
+ if (keyInfo == null) {
+ // This is a newly added key, it does not have any versions.
+ OmKeyLocationInfoGroup keyLocationInfoGroup = new
+ OmKeyLocationInfoGroup(0, locations);
+ // A newly created key, this is the first version.
+ keyInfo = new OmKeyInfo.Builder()
+ .setVolumeName(omKeyArgs.getVolumeName())
+ .setBucketName(omKeyArgs.getBucketName())
+ .setKeyName(omKeyArgs.getKeyName())
+ .setReplicationFactor(factor)
+ .setReplicationType(type)
+ .setCreationTime(Time.now())
+ .setModificationTime(Time.now())
+ .setDataSize(size)
+ .setOmKeyLocationInfos(
+ Collections.singletonList(keyLocationInfoGroup))
+ .build();
+ } else {
+ // Already a version exists, so we should add it as a new version.
+ // But now as versioning is not supported, just following the commit
+ // key approach.
+ // When versioning support comes, then we can uncomment below code
+ // keyInfo.addNewVersion(locations);
+ keyInfo.updateLocationInfoList(locations);
+ }
+ DBStore store = metadataManager.getStore();
+ try (BatchOperation batch = store.initBatchOperation()) {
+ //Remove entry in multipart table and add a entry in to key table
+ metadataManager.getMultipartInfoTable().deleteWithBatch(batch,
+ multipartKey);
+ metadataManager.getKeyTable().putWithBatch(batch,
+ ozoneKey, keyInfo);
+ store.commitBatchOperation(batch);
+ }
+ return new OmMultipartUploadCompleteInfo(omKeyArgs.getVolumeName(),
+ omKeyArgs.getBucketName(), omKeyArgs.getKeyName(), DigestUtils
+ .sha256Hex(keyName));
+ } catch (OMException ex) {
+ throw ex;
+ } catch (IOException ex) {
+ LOG.error("Complete Multipart Upload Failed: volume: " + volumeName +
+ "bucket: " + bucketName + "key: " + keyName, ex);
+ throw new OMException(ex.getMessage(), ResultCodes
+ .COMPLETE_MULTIPART_UPLOAD_FAILED);
+ } finally {
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+ }
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 715ebb8..89e1679 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -62,6 +62,7 @@ public class OMMetrics {
private @Metric MutableCounterLong numGetServiceLists;
private @Metric MutableCounterLong numListS3Buckets;
private @Metric MutableCounterLong numInitiateMultipartUploads;
+ private @Metric MutableCounterLong numCompleteMultipartUploads;
// Failure Metrics
@@ -88,6 +89,7 @@ public class OMMetrics {
private @Metric MutableCounterLong numInitiateMultipartUploadFails;
private @Metric MutableCounterLong numCommitMultipartUploadParts;
private @Metric MutableCounterLong getNumCommitMultipartUploadPartFails;
+ private @Metric MutableCounterLong numCompleteMultipartUploadFails;
// Metrics for total number of volumes, buckets and keys
@@ -247,6 +249,15 @@ public class OMMetrics {
numInitiateMultipartUploadFails.incr();
}
+ public void incNumCompleteMultipartUploads() {
+ numKeyOps.incr();
+ numCompleteMultipartUploads.incr();
+ }
+
+ public void incNumCompleteMultipartUploadFails() {
+ numCompleteMultipartUploadFails.incr();
+ }
+
public void incNumGetServiceLists() {
numGetServiceLists.incr();
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 4fc0813..941b80c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@@ -1645,6 +1647,31 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
return commitUploadPartInfo;
}
+ @Override
+ public OmMultipartUploadCompleteInfo completeMultipartUpload(
+ OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList)
+ throws IOException {
+ OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo;
+ metrics.incNumCompleteMultipartUploads();
+
+ Map<String, String> auditMap = (omKeyArgs == null) ? new LinkedHashMap<>()
:
+ omKeyArgs.toAuditMap();
+ auditMap.put(OzoneConsts.MULTIPART_LIST, multipartUploadList
+ .getMultipartMap().toString());
+ try {
+ omMultipartUploadCompleteInfo = keyManager.completeMultipartUpload(
+ omKeyArgs, multipartUploadList);
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction
+ .COMPLETE_MULTIPART_UPLOAD, auditMap));
+ return omMultipartUploadCompleteInfo;
+ } catch (IOException ex) {
+ metrics.incNumCompleteMultipartUploadFails();
+ AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction
+ .COMPLETE_MULTIPART_UPLOAD, auditMap, ex));
+ throw ex;
+ }
+ }
+
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 6d93a78..58f7531 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -119,6 +119,10 @@ public class OMException extends IOException {
INITIATE_MULTIPART_UPLOAD_FAILED,
NO_SUCH_MULTIPART_UPLOAD,
UPLOAD_PART_FAILED,
+ MISMATCH_MULTIPART_LIST,
+ MISSING_UPLOAD_PARTS,
+ COMPLETE_MULTIPART_UPLOAD_FAILED,
+ ENTITY_TOO_SMALL,
INVALID_REQUEST;
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 33453ac..de88bc6 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@@ -101,6 +103,10 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartCommitUploadPartResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .MultipartUploadCompleteRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .MultipartUploadCompleteResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateResponse;
@@ -108,6 +114,7 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.RenameKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -149,6 +156,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
+import java.util.TreeMap;
import java.util.stream.Collectors;
/**
@@ -331,6 +339,13 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
responseBuilder.setCommitMultiPartUploadResponse(
commitUploadPartResponse);
break;
+ case CompleteMultiPartUpload:
+ MultipartUploadCompleteResponse completeMultipartUploadResponse =
+ completeMultipartUpload(
+ request.getCompleteMultiPartUploadRequest());
+ responseBuilder.setCompleteMultiPartUploadResponse(
+ completeMultipartUploadResponse);
+ break;
case ServiceList:
ServiceListResponse serviceListResponse = getServiceList(
request.getServiceListRequest());
@@ -392,7 +407,14 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
return Status.NO_SUCH_MULTIPART_UPLOAD_ERROR;
case UPLOAD_PART_FAILED:
return Status.MULTIPART_UPLOAD_PARTFILE_ERROR;
-
+ case COMPLETE_MULTIPART_UPLOAD_FAILED:
+ return Status.COMPLETE_MULTIPART_UPLOAD_ERROR;
+ case MISMATCH_MULTIPART_LIST:
+ return Status.MISMATCH_MULTIPART_LIST;
+ case MISSING_UPLOAD_PARTS:
+ return Status.MISSING_UPLOAD_PARTS;
+ case ENTITY_TOO_SMALL:
+ return Status.ENTITY_TOO_SMALL;
default:
return Status.INTERNAL_ERROR;
}
@@ -839,6 +861,10 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
.setMultipartUploadID(keyArgs.getMultipartUploadID())
.setIsMultipartKey(keyArgs.getIsMultipartKey())
.setMultipartUploadPartNumber(keyArgs.getMultipartNumber())
+ .setDataSize(keyArgs.getDataSize())
+ .setLocationInfoList(keyArgs.getKeyLocationsList().stream()
+ .map(OmKeyLocationInfo::getFromProtobuf)
+ .collect(Collectors.toList()))
.build();
OmMultipartCommitUploadPartInfo commitUploadPartInfo =
impl.commitMultipartUploadPart(omKeyArgs, request.getClientID());
@@ -849,4 +875,42 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
}
return resp.build();
}
+
+
+ private MultipartUploadCompleteResponse completeMultipartUpload(
+ MultipartUploadCompleteRequest request) {
+ MultipartUploadCompleteResponse.Builder response =
+ MultipartUploadCompleteResponse.newBuilder();
+
+ try {
+ KeyArgs keyArgs = request.getKeyArgs();
+ List<Part> partsList = request.getPartsListList();
+
+ TreeMap<Integer, String> partsMap = new TreeMap<>();
+ for (Part part : partsList) {
+ partsMap.put(part.getPartNumber(), part.getPartName());
+ }
+
+ OmMultipartUploadList omMultipartUploadList =
+ new OmMultipartUploadList(partsMap);
+
+ OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+ .setVolumeName(keyArgs.getVolumeName())
+ .setBucketName(keyArgs.getBucketName())
+ .setKeyName(keyArgs.getKeyName())
+ .setMultipartUploadID(keyArgs.getMultipartUploadID())
+ .build();
+ OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = impl
+ .completeMultipartUpload(omKeyArgs, omMultipartUploadList);
+
+ response.setVolume(omMultipartUploadCompleteInfo.getVolume())
+ .setBucket(omMultipartUploadCompleteInfo.getBucket())
+ .setKey(omMultipartUploadCompleteInfo.getKey())
+ .setHash(omMultipartUploadCompleteInfo.getHash());
+ response.setStatus(Status.OK);
+ } catch (IOException ex) {
+ response.setStatus(exceptionToResponseStatus(ex));
+ }
+ return response.build();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]