[ 
https://issues.apache.org/jira/browse/HADOOP-17414?focusedWorklogId=533314&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-533314
 ]

ASF GitHub Bot logged work on HADOOP-17414:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jan/21 01:41
            Start Date: 09/Jan/21 01:41
    Worklog Time Spent: 10m 
      Work Description: sunchao commented on a change in pull request #2530:
URL: https://github.com/apache/hadoop/pull/2530#discussion_r554269175



##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import com.amazonaws.services.s3.Headers;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.s3a.Constants.HEADER_PREFIX;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+
+/**
+ * Part of the S3A FS where object headers are
+ * processed.
+ * Implements all the various XAttr read operations.
+ * Those APIs all expect byte arrays back.
+ * Metadata cloning is also implemented here, so as
+ * to stay in sync with custom header logic.
+ */
+public class HeaderProcessing extends AbstractStoreOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      HeaderProcessing.class);
+
+  private static final byte[] EMPTY = new byte[0];
+
+  /**
+   * Length XAttr.
+   */
+  public static final String XA_CONTENT_LENGTH =
+      HEADER_PREFIX + Headers.CONTENT_LENGTH;
+
+  /**
+   * last modified XAttr.
+   */
+  public static final String XA_LAST_MODIFIED =
+      HEADER_PREFIX + Headers.LAST_MODIFIED;
+
+  public static final String XA_CONTENT_DISPOSITION =
+      HEADER_PREFIX + Headers.CONTENT_DISPOSITION;
+
+  public static final String XA_CONTENT_ENCODING =
+      HEADER_PREFIX + Headers.CONTENT_ENCODING;
+
+  public static final String XA_CONTENT_LANGUAGE =
+      HEADER_PREFIX + Headers.CONTENT_LANGUAGE;
+
+  public static final String XA_CONTENT_MD5 =
+      HEADER_PREFIX + Headers.CONTENT_MD5;
+
+  public static final String XA_CONTENT_RANGE =
+      HEADER_PREFIX + Headers.CONTENT_RANGE;
+
+  public static final String XA_CONTENT_TYPE =
+      HEADER_PREFIX + Headers.CONTENT_TYPE;
+
+  public static final String XA_ETAG = HEADER_PREFIX + Headers.ETAG;
+
+  public HeaderProcessing(final StoreContext storeContext) {
+    super(storeContext);
+  }
+
+  /**
+   * Query the store, get all the headers into a map. Each Header
+   * has the "header." prefix.
+   * Caller must have read access.
+   * The value of each header is the string value of the object
+   * UTF-8 encoded.
+   * @param path path of object.
+   * @return the headers
+   * @throws IOException failure, including file not found.
+   */
+  private Map<String, byte[]> retrieveHeaders(Path path) throws IOException {

Review comment:
       nit: I wonder if some kind of caching will be useful here. We are 
calling `getObjectMetadata` for every `getXAttr` call.

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -330,6 +331,11 @@
    */
   private DirectoryPolicy directoryPolicy;
 
+  /**
+   * Header processing for XAttr. Created on demand.

Review comment:
       nit: is "Created on demand" accurate? it is created in `initialize`.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
##########
@@ -1873,11 +1873,9 @@
 
 <property>
   <name>fs.s3a.committer.magic.enabled</name>
-  <value>false</value>
+  <value>true</value>

Review comment:
       I also wonder if this (along with the documentation change) is required 
for this PR.

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import com.amazonaws.services.s3.Headers;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.s3a.Constants.HEADER_PREFIX;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+
+/**
+ * Part of the S3A FS where object headers are
+ * processed.
+ * Implements all the various XAttr read operations.
+ * Those APIs all expect byte arrays back.
+ * Metadata cloning is also implemented here, so as
+ * to stay in sync with custom header logic.
+ */
+public class HeaderProcessing extends AbstractStoreOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      HeaderProcessing.class);
+
+  private static final byte[] EMPTY = new byte[0];
+
+  /**
+   * Length XAttr.
+   */
+  public static final String XA_CONTENT_LENGTH =
+      HEADER_PREFIX + Headers.CONTENT_LENGTH;
+
+  /**
+   * last modified XAttr.
+   */
+  public static final String XA_LAST_MODIFIED =
+      HEADER_PREFIX + Headers.LAST_MODIFIED;
+
+  public static final String XA_CONTENT_DISPOSITION =

Review comment:
       nit: to be consistent with the above, should we add comments for the 
following constants?

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import com.amazonaws.services.s3.Headers;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.s3a.Constants.HEADER_PREFIX;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+
+/**
+ * Part of the S3A FS where object headers are
+ * processed.
+ * Implements all the various XAttr read operations.
+ * Those APIs all expect byte arrays back.
+ * Metadata cloning is also implemented here, so as
+ * to stay in sync with custom header logic.
+ */
+public class HeaderProcessing extends AbstractStoreOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      HeaderProcessing.class);
+
+  private static final byte[] EMPTY = new byte[0];
+
+  /**
+   * Length XAttr.
+   */
+  public static final String XA_CONTENT_LENGTH =
+      HEADER_PREFIX + Headers.CONTENT_LENGTH;
+
+  /**
+   * last modified XAttr.
+   */
+  public static final String XA_LAST_MODIFIED =
+      HEADER_PREFIX + Headers.LAST_MODIFIED;
+
+  public static final String XA_CONTENT_DISPOSITION =
+      HEADER_PREFIX + Headers.CONTENT_DISPOSITION;
+
+  public static final String XA_CONTENT_ENCODING =
+      HEADER_PREFIX + Headers.CONTENT_ENCODING;
+
+  public static final String XA_CONTENT_LANGUAGE =
+      HEADER_PREFIX + Headers.CONTENT_LANGUAGE;
+
+  public static final String XA_CONTENT_MD5 =
+      HEADER_PREFIX + Headers.CONTENT_MD5;
+
+  public static final String XA_CONTENT_RANGE =
+      HEADER_PREFIX + Headers.CONTENT_RANGE;
+
+  public static final String XA_CONTENT_TYPE =
+      HEADER_PREFIX + Headers.CONTENT_TYPE;
+
+  public static final String XA_ETAG = HEADER_PREFIX + Headers.ETAG;
+
+  public HeaderProcessing(final StoreContext storeContext) {
+    super(storeContext);
+  }
+
+  /**
+   * Query the store, get all the headers into a map. Each Header
+   * has the "header." prefix.
+   * Caller must have read access.
+   * The value of each header is the string value of the object
+   * UTF-8 encoded.
+   * @param path path of object.
+   * @return the headers
+   * @throws IOException failure, including file not found.
+   */
+  private Map<String, byte[]> retrieveHeaders(Path path) throws IOException {
+    StoreContext context = getStoreContext();
+    ObjectMetadata md = context.getContextAccessors()
+        .getObjectMetadata(path);
+    Map<String, String> rawHeaders = md.getUserMetadata();
+    Map<String, byte[]> headers = new TreeMap<>();
+    rawHeaders.forEach((key, value) ->
+        headers.put(HEADER_PREFIX + key, encodeBytes(value)));
+    // and add the usual content length &c, if set
+    headers.put(XA_CONTENT_DISPOSITION,
+        encodeBytes(md.getContentDisposition()));
+    headers.put(XA_CONTENT_ENCODING,
+        encodeBytes(md.getContentEncoding()));
+    headers.put(XA_CONTENT_LANGUAGE,
+        encodeBytes(md.getContentLanguage()));
+    headers.put(XA_CONTENT_LENGTH,
+        encodeBytes(md.getContentLength()));
+    headers.put(
+        XA_CONTENT_MD5,
+        encodeBytes(md.getContentMD5()));
+    headers.put(XA_CONTENT_RANGE,
+        encodeBytes(md.getContentRange()));
+    headers.put(XA_CONTENT_TYPE,
+        encodeBytes(md.getContentType()));
+    headers.put(XA_ETAG,
+        encodeBytes(md.getETag()));
+    headers.put(XA_LAST_MODIFIED,
+        encodeBytes(md.getLastModified()));
+    return headers;
+  }
+
+  /**
+   * Stringify an object and return its bytes in UTF-8 encoding.
+   * @param s source
+   * @return encoded object or null
+   */
+  public static byte[] encodeBytes(Object s) {
+    return s == null
+        ? EMPTY
+        : s.toString().getBytes(StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Get the string value from the bytes.
+   * if null : return null, otherwise the UTF-8 decoded
+   * bytes.
+   * @param bytes source bytes
+   * @return decoded value
+   */
+  public static String decodeBytes(byte[] bytes) {
+    return bytes == null
+        ? null
+        : new String(bytes, StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Get an XAttr name and value for a file or directory.
+   * @param path Path to get extended attribute
+   * @param name XAttr name.
+   * @return byte[] XAttr value or null
+   * @throws IOException IO failure
+   * @throws UnsupportedOperationException if the operation is unsupported
+   *         (default outcome).
+   */
+  public byte[] getXAttr(Path path, String name) throws IOException {
+    return retrieveHeaders(path).get(name);
+  }
+
+  /**
+   * See {@code FileSystem.getXAttrs(path}.
+   *
+   * @param path Path to get extended attributes
+   * @return Map describing the XAttrs of the file or directory
+   * @throws IOException IO failure
+   * @throws UnsupportedOperationException if the operation is unsupported
+   *         (default outcome).
+   */
+  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+    return retrieveHeaders(path);
+  }
+
+  /**
+   * See {@code FileSystem.listXAttrs(path)}.
+   * @param path Path to get extended attributes
+   * @return List of supported XAttrs
+   * @throws IOException IO failure
+   */
+  public List<String> listXAttrs(final Path path) throws IOException {
+    return new ArrayList<>(retrieveHeaders(path).keySet());
+  }
+
+  /**
+   * See {@code FileSystem.getXAttrs(path, names}.
+   * @param path Path to get extended attributes
+   * @param names XAttr names.
+   * @return Map describing the XAttrs of the file or directory
+   * @throws IOException IO failure
+   */
+  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
+      throws IOException {
+    Map<String, byte[]> headers = retrieveHeaders(path);
+    Map<String, byte[]> result = new TreeMap<>();
+    headers.entrySet().stream()
+        .filter(entry -> names.contains(entry.getKey()))
+        .forEach(entry -> result.put(entry.getKey(), entry.getValue()));
+    return result;
+  }
+
+  /**
+   * Convert an XAttr byte array to a long.
+   * testability.
+   * @param data data to parse
+   * @return either a length or none
+   */
+  public static Optional<Long> extractXAttrLongValue(byte[] data) {
+    String xAttr;
+    xAttr = HeaderProcessing.decodeBytes(data);
+    if (StringUtils.isNotEmpty(xAttr)) {
+      try {
+        long l = Long.parseLong(xAttr);
+        if (l >= 0) {
+          return Optional.of(l);
+        }
+      } catch (NumberFormatException ex) {
+        LOG.warn("Not a number: {}", xAttr);
+      }
+    }
+    // missing/empty header or parse failure.
+    return Optional.empty();
+  }
+
+  /**
+   * Creates a copy of the passed {@link ObjectMetadata}.
+   * Does so without using the {@link ObjectMetadata#clone()} method,
+   * to avoid copying unnecessary headers.
+   * This operation does not copy the {@code X_HEADER_MAGIC_MARKER}
+   * header to avoid confusion. If a marker file is renamed,
+   * it loses information about any remapped file.
+   * @param source the {@link ObjectMetadata} to copy
+   * @param ret the metadata to update; this is the return value.
+   * @return a copy of {@link ObjectMetadata} with only relevant attributes

Review comment:
       hmm why we need a return value if `ret` is already changed in place?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 533314)
    Time Spent: 4.5h  (was: 4h 20m)

> Magic committer files don't have the count of bytes written collected by spark
> ------------------------------------------------------------------------------
>
>                 Key: HADOOP-17414
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17414
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.2.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The spark statistics tracking doesn't correctly assess the size of the 
> uploaded files as it only calls getFileStatus on the zero byte objects -not 
> the yet-to-manifest files. Which, given they don't exist yet, isn't easy to 
> do.
> Solution: 
> * Add getXAttr and listXAttr API calls to S3AFileSystem
> * Return all S3 object headers as XAttr attributes prefixed "header." That's 
> custom and standard (e.g header.Content-Length).
> The setXAttr call isn't implemented, so for correctness the FS doesn't
> declare its support for the API in hasPathCapability().
> The magic commit file write sets the custom header 
> set the length of the data final data in the header
> x-hadoop-s3a-magic-data-length in the marker file.
> A matching patch in Spark will look for the XAttr
> "header.x-hadoop-s3a-magic-data-length" when the file
> being probed for output data is zero byte long. 
> As a result, the job tracking statistics will report the
> bytes written but yet to be manifest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to