[
https://issues.apache.org/jira/browse/HADOOP-13786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16003372#comment-16003372
]
Aaron Fabbri commented on HADOOP-13786:
---------------------------------------
Been spending some time reviewing the patch. It is pretty large, so I haven't
covered 100%, but here are some comments so far:
{code}
+ * This method was copied from
+ * {@code org.apache.hadoop.registry.client.binding.JsonSerDeser}.
+ * @param <T> Type to marshal.
+ */
{code}
Can we move the dependency to a common place instead of copying?
{code}
/**
+ * A factory for committers implementing the {@link PathOutputCommitter}
+ * methods, and so can be used from {@link FileOutputFormat}.
+ * The base implementation returns {@link FileOutputFormat} instances.
+ */
+public class PathOutputCommitterFactory extends Configured {
{code}
In the comment, did you mean "FileOutputCommitter instances"?
{code}
bytes = putObject();
}
} else {
- // there has already been at least one block scheduled for upload;
- // put up the current then wait
- if (hasBlock && block.hasData()) {
+ // there's an MPU in progress';
+ // IF there is more data to upload, or no data has yet been uploaded,
+ // PUT the final block
+ if (hasBlock &&
+ (block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) {
{code}
This to allow for zero byte files, right? Looks like you had to make similar
changes elsewhere. Did you cover this new behavior with a test case?
{code}
+ public void abortOutstandingMultipartUploads(long seconds)
+ throws IOException {
+ Preconditions.checkArgument(seconds >=0);
+ Date purgeBefore =
+ new Date(new Date().getTime() - seconds * 1000);
+ LOG.debug("Purging outstanding multipart uploads older than {}",
+ purgeBefore);
+ try {
+ transfers.abortMultipartUploads(bucket, purgeBefore);
+ } catch (AmazonServiceException e) {
+ throw translateException("purging multipart uploads", bucket, e);
+ }
{code}
Thinking of how multipart purge interacts with magic committer.
Currently someone can config fs.s3a.multipart.purge.age to 0. I'm wondering if
we shouldn't increase the minimum or log a warning when magic committer is also
enabled? Maybe it at least deserves some special treatment in the docs?
{code}
* Returns the S3 client used by this filesystem.
* @return AmazonS3Client
*/
- AmazonS3 getAmazonS3Client() {
+ public AmazonS3 getAmazonS3Client() {
return s3;
}
{code}
Not @VisibleForTesting? Looks like you just want to subclass/override. Can we
make it protected instead?
{code}
* @param key input key
* @return the fully qualified path including URI scheme and bucket name.
*/
- Path keyToQualifiedPath(String key) {
+ public Path keyToQualifiedPath(String key) {
{code}
Should we annotate the interface stability or audience here?
{code}
* @return the upload result containing the ID
* @throws IOException IO problem
*/
- String initiateMultiPartUpload() throws IOException {
+ public String initiateMultiPartUpload() throws IOException {
LOG.debug("Initiating Multipart upload");
final InitiateMultipartUploadRequest initiateMPURequest =
new InitiateMultipartUploadRequest(bucket,
{code}
Audience annotation?
{code}
+ /**
+ * Finalize a multipart PUT operation.
+ * This completes the upload, and, if that works, calls
+ * {@link #finishedWrite(String, long)} to update the filesystem.
+ * @param uploadId multipart operation Id
+ * @param partETags list of partial uploads
+ * @return the result of the operation.
+ * @throws IOException on problems.
+ */
{code}
Semi-related: I've been thinking of splitting up S3AFileSystem in the
future. Seems like we could separate WriteOperationHelper, and maybe put
other S3-level operations (i.e. below anything that cares about MetadataStore)
into a separate class.
{code}
+ public int abortMultipartUploadsUnderPath(String prefix)
+ throws IOException {
+ int count = 0;
+ for (MultipartUpload upload : listMultipartUploads(prefix)) {
+ try {
+ abortMultipartCommit(upload);
+ count++;
+ } catch (FileNotFoundException e) {
+ LOG.debug("Already aborted: {}", upload.getKey(), e);
{code}
I wonder if we'll need to deal with retryable exceptions here, instead of
propagating.
{code}
+ * @return the upload initiated
+ * @throws IOException on problems
+ */
+ public PutObjectResult putObjectAndFinalize(
+ PutObjectRequest putObjectRequest,
+ int length) throws IOException {
+ PutObjectResult result = putObject(putObjectRequest);
+ finishedWrite(putObjectRequest.getKey(), length);
{code}
Also wondering about error handling / retries here. I don't know if there
is a difference between direct put and TransferManager in terms of built-in
retry logic.
{code}
+ /**
+ * Revert a commit by deleting the file.
+ * TODO: Policy regarding creating a mock empty parent directory.
+ * @param destKey destination key
+ * @throws IOException due to inability to delete a directory or file.
+ */
+ public void revertCommit(String destKey) throws IOException {
{code}
Does the directory need to exist after revert? Can we just say it doesn't?
{code}
+ /**
+ * Delete a path quietly: failures are logged at DEBUG.
+ * @param fs filesystem
+ * @param path path
+ * @param recursive recursive?
+ */
+ public static void deleteWithWarning(FileSystem fs,
{code}
/quietly: failures are logged at DEBUG/: failures are logged at WARN/
{code}
+public abstract class Abstract3GuardCommitterFactory
{code}
"AbstractS3GuardCommitterFactory" (Missing the 'S')
{code}
+ public static List<String> splitPathToElements(Path path) {
+ String uriPath = path.toUri().getPath();
+ checkArgument(!uriPath.isEmpty(), "empty path");
+ checkArgument(uriPath.charAt(0) == '/', "path is relative");
+ if ("/".equals(uriPath)) {
+ // special case: empty list
+ return new ArrayList<>(0);
+ }
+ path.depth();
{code}
Meant to delete this last line?
{code}
+ /**
+ * Verify that that path is a delayed commit path.
{code}
nit: /that/the/ or /that//
{code}
+
+/**
+ * Adds the code needed for S3A integration.
+ * It's pulled out to keep S3A FS class slightly less complex.
{code}
Thanks for the separation here.
{code}
+ * Configuration options:
+ * <pre>
+ * : temporary local FS directory
+ * : intermediate directory on a cluster-wide FS (can be HDFS or a consistent
+ * s3 endpoint).
{code}
Couldn't parse this comment. Is there supposed to be a config name before the
':'?
{code}
+ public void testConcurrentCommitTaskWithSubDir() throws Exception {
+ Job job = newJob();
+ FileOutputFormat.setOutputPath(job, outDir);
+ final Configuration conf = job.getConfiguration();
+/*
+
+ conf.setClass("fs.file.impl", RLFS.class, FileSystem.class);
+ FileSystem.closeAll();
+*/
{code}
There are a couple of spots with commented-out code and TODOs still. Note to
clean these up.
{code}
+ // if this fails with "directory is already locked" set umask to 0022
+ cluster = new MiniDFSCluster(conf, 1, true, null);
+ //cluster = new MiniDFSCluster.Builder(new Configuration()).build();
{code}
ditto.
{code}
+/*
+ @Override
+ protected void deleteObject(String key) throws InvalidRequestException {
+ // force into sharing the existing mock entry point
+ getAmazonS3Client().deleteObject(new DeleteObjectRequest(getBucket(),
key));
+ }
+*/
{code}
same.
{code}
+
+ @Override
+ public void setFaults(Faults... faults) {
+ injection.setFaults(faults);
{code}
I only saw this used once in AbstractITCommitProtocol which sets a
commitJob failure. Curious to hear your plans on more fault injection. Hope
we can discuss in a call soon.
Finally, a reminder that your log4j.properties diff will need to be commented
out or removed eventually. The settings here are useful for testing and dev
though.
> Add S3Guard committer for zero-rename commits to consistent S3 endpoints
> ------------------------------------------------------------------------
>
> Key: HADOOP-13786
> URL: https://issues.apache.org/jira/browse/HADOOP-13786
> Project: Hadoop Common
> Issue Type: New Feature
> Components: fs/s3
> Affects Versions: HADOOP-13345
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Attachments: HADOOP-13786-HADOOP-13345-001.patch,
> HADOOP-13786-HADOOP-13345-002.patch, HADOOP-13786-HADOOP-13345-003.patch,
> HADOOP-13786-HADOOP-13345-004.patch, HADOOP-13786-HADOOP-13345-005.patch,
> HADOOP-13786-HADOOP-13345-006.patch, HADOOP-13786-HADOOP-13345-006.patch,
> HADOOP-13786-HADOOP-13345-007.patch, HADOOP-13786-HADOOP-13345-009.patch,
> HADOOP-13786-HADOOP-13345-010.patch, HADOOP-13786-HADOOP-13345-011.patch,
> HADOOP-13786-HADOOP-13345-012.patch, HADOOP-13786-HADOOP-13345-013.patch,
> HADOOP-13786-HADOOP-13345-015.patch, HADOOP-13786-HADOOP-13345-016.patch,
> HADOOP-13786-HADOOP-13345-017.patch, HADOOP-13786-HADOOP-13345-018.patch,
> HADOOP-13786-HADOOP-13345-019.patch, HADOOP-13786-HADOOP-13345-020.patch,
> HADOOP-13786-HADOOP-13345-021.patch, HADOOP-13786-HADOOP-13345-022.patch,
> HADOOP-13786-HADOOP-13345-023.patch, HADOOP-13786-HADOOP-13345-024.patch,
> HADOOP-13786-HADOOP-13345-025.patch, objectstore.pdf, s3committer-master.zip
>
>
> A goal of this code is "support O(1) commits to S3 repositories in the
> presence of failures". Implement it, including whatever is needed to
> demonstrate the correctness of the algorithm. (that is, assuming that s3guard
> provides a consistent view of the presence/absence of blobs, show that we can
> commit directly).
> I consider ourselves free to expose the blobstore-ness of the s3 output
> streams (ie. not visible until the close()), if we need to use that to allow
> us to abort commit operations.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]