[
https://issues.apache.org/jira/browse/HADOOP-19120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17829862#comment-17829862
]
ASF GitHub Bot commented on HADOOP-19120:
-----------------------------------------
anmolanmol1234 commented on code in PR #6633:
URL: https://github.com/apache/hadoop/pull/6633#discussion_r1535503329
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/HttpOperation.java:
##########
@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
+
+/**
+ * Base Http operation class for orchestrating server IO calls. Child classes
would
+ * define the certain orchestration implementation on the basis of network
library used.
+ * <p>
+ * For JDK netlib usage, the child class would be {@link AbfsHttpOperation}.
<br>
+ * For ApacheHttpClient netlib usage, the child class would be {@link
AbfsAHCHttpOperation}.
+ * </p>
+ */
+public abstract class HttpOperation implements AbfsPerfLoggable {
+
+ private final Logger log;
+
+ private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024;
+
+ private static final int ONE_THOUSAND = 1000;
+
+ private static final int ONE_MILLION = ONE_THOUSAND * ONE_THOUSAND;
+
+ private String method;
+
+ private URL url;
+
+ private String maskedUrl;
+
+ private String maskedEncodedUrl;
+
+ private int statusCode;
+
+ private String statusDescription;
+
+ private String storageErrorCode = "";
+
+ private String storageErrorMessage = "";
+
+ private String requestId = "";
+
+ private String expectedAppendPos = "";
+
+ private ListResultSchema listResultSchema = null;
+
+ // metrics
+ private int bytesSent;
+
+ private int expectedBytesToBeSent;
+
+ private long bytesReceived;
+
+ private long connectionTimeMs;
+
+ private long sendRequestTimeMs;
+
+ private long recvResponseTimeMs;
+
+ private boolean shouldMask = false;
+
+ public HttpOperation(Logger logger,
+ final URL url,
+ final String method,
+ final int httpStatus) {
+ this.log = logger;
+ this.url = url;
+ this.method = method;
+ this.statusCode = httpStatus;
+ }
+
+ public HttpOperation(final Logger log, final URL url, final String method) {
+ this.log = log;
+ this.url = url;
+ this.method = method;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ public String getHost() {
+ return url.getHost();
+ }
+
+ public int getStatusCode() {
+ return statusCode;
+ }
+
+ public String getStatusDescription() {
+ return statusDescription;
+ }
+
+ public String getStorageErrorCode() {
+ return storageErrorCode;
+ }
+
+ public String getStorageErrorMessage() {
+ return storageErrorMessage;
+ }
+
+ public abstract String getClientRequestId();
+
+ public String getExpectedAppendPos() {
+ return expectedAppendPos;
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setMaskForSAS() {
+ shouldMask = true;
+ }
+
+ public int getBytesSent() {
+ return bytesSent;
+ }
+
+ public int getExpectedBytesToBeSent() {
+ return expectedBytesToBeSent;
+ }
+
+ public long getBytesReceived() {
+ return bytesReceived;
+ }
+
+ public URL getUrl() {
+ return url;
+ }
+
+ public ListResultSchema getListResultSchema() {
+ return listResultSchema;
+ }
+
+ public abstract String getResponseHeader(String httpHeader);
+
+ void setExpectedBytesToBeSent(int expectedBytesToBeSent) {
+ this.expectedBytesToBeSent = expectedBytesToBeSent;
+ }
+
+ void setStatusCode(int statusCode) {
+ this.statusCode = statusCode;
+ }
+
+ void setStatusDescription(String statusDescription) {
+ this.statusDescription = statusDescription;
+ }
+
+ void setBytesSent(int bytesSent) {
+ this.bytesSent = bytesSent;
+ }
+
+ void setSendRequestTimeMs(long sendRequestTimeMs) {
+ this.sendRequestTimeMs = sendRequestTimeMs;
+ }
+
+ void setRecvResponseTimeMs(long recvResponseTimeMs) {
+ this.recvResponseTimeMs = recvResponseTimeMs;
+ }
+
+ void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ void setConnectionTimeMs(long connectionTimeMs) {
+ this.connectionTimeMs = connectionTimeMs;
+ }
+
+ // Returns a trace message for the request
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(statusCode);
+ sb.append(",");
+ sb.append(storageErrorCode);
+ sb.append(",");
+ sb.append(expectedAppendPos);
+ sb.append(",cid=");
+ sb.append(getClientRequestId());
+ sb.append(",rid=");
+ sb.append(requestId);
+ sb.append(",connMs=");
+ sb.append(connectionTimeMs);
+ sb.append(",sendMs=");
+ sb.append(sendRequestTimeMs);
+ sb.append(",recvMs=");
+ sb.append(recvResponseTimeMs);
+ sb.append(",sent=");
+ sb.append(bytesSent);
+ sb.append(",recv=");
+ sb.append(bytesReceived);
+ sb.append(",");
+ sb.append(method);
+ sb.append(",");
+ sb.append(getMaskedUrl());
+ return sb.toString();
+ }
+
+ // Returns a trace message for the ABFS API logging service to consume
+ public String getLogString() {
+
+ final StringBuilder sb = new StringBuilder();
+ sb.append("s=")
+ .append(statusCode)
+ .append(" e=")
+ .append(storageErrorCode)
+ .append(" ci=")
+ .append(getClientRequestId())
+ .append(" ri=")
+ .append(requestId)
+
+ .append(" ct=")
+ .append(connectionTimeMs)
+ .append(" st=")
+ .append(sendRequestTimeMs)
+ .append(" rt=")
+ .append(recvResponseTimeMs)
+
+ .append(" bs=")
+ .append(bytesSent)
+ .append(" br=")
+ .append(bytesReceived)
+ .append(" m=")
+ .append(method)
+ .append(" u=")
+ .append(getMaskedEncodedUrl());
+
+ return sb.toString();
+ }
+
+ public String getMaskedUrl() {
+ if (!shouldMask) {
+ return url.toString();
+ }
+ if (maskedUrl != null) {
+ return maskedUrl;
+ }
+ maskedUrl = UriUtils.getMaskedUrl(url);
+ return maskedUrl;
+ }
+
+ public String getMaskedEncodedUrl() {
+ if (maskedEncodedUrl != null) {
+ return maskedEncodedUrl;
+ }
+ maskedEncodedUrl = UriUtils.encodedUrlStr(getMaskedUrl());
+ return maskedEncodedUrl;
+ }
+
+ public abstract void sendPayload(byte[] buffer, int offset, int length)
throws
+ IOException;
+
+ public abstract void processResponse(byte[] buffer,
+ int offset,
+ int length) throws IOException;
+
+ public abstract void setRequestProperty(String key, String value);
+
+ void parseResponse(final byte[] buffer,
+ final int offset,
+ final int length) throws IOException {
+ long startTime;
+ if (AbfsHttpConstants.HTTP_METHOD_HEAD.equals(this.method)) {
+ // If it is HEAD, and it is ERROR
+ return;
+ }
+
+ startTime = System.nanoTime();
+
+ if (statusCode >= HttpURLConnection.HTTP_BAD_REQUEST) {
+ processStorageErrorResponse();
+ this.recvResponseTimeMs += elapsedTimeMs(startTime);
+ String contentLength = getResponseHeader(
+ HttpHeaderConfigurations.CONTENT_LENGTH);
+ if (contentLength != null) {
+ this.bytesReceived = Long.parseLong(contentLength);
+ } else {
+ this.bytesReceived = 0L;
+ }
+
+ } else {
+ // consume the input stream to release resources
+ int totalBytesRead = 0;
+
+ try (InputStream stream = getContentInputStream()) {
+ if (isNullInputStream(stream)) {
+ return;
+ }
+ boolean endOfStream = false;
+
+ // this is a list operation and need to retrieve the data
+ // need a better solution
+ if (AbfsHttpConstants.HTTP_METHOD_GET.equals(this.method)
+ && buffer == null) {
+ parseListFilesResponse(stream);
+ } else {
+ if (buffer != null) {
+ while (totalBytesRead < length) {
+ int bytesRead = stream.read(buffer, offset + totalBytesRead,
+ length
+ - totalBytesRead);
+ if (bytesRead == -1) {
+ endOfStream = true;
+ break;
+ }
+ totalBytesRead += bytesRead;
+ }
+ }
+ if (!endOfStream && stream.read() != -1) {
+ // read and discard
+ int bytesRead = 0;
+ byte[] b = new byte[CLEAN_UP_BUFFER_SIZE];
+ while ((bytesRead = stream.read(b)) >= 0) {
+ totalBytesRead += bytesRead;
+ }
+ }
+ }
+ } catch (IOException ex) {
+ log.warn("IO/Network error: {} {}: {}",
+ method, getMaskedUrl(), ex.getMessage());
+ log.debug("IO Error: ", ex);
Review Comment:
not identifiable is thrown from which instance, should include some
identifier
> [ABFS]: ApacheHttpClient adaptation as network library
> ------------------------------------------------------
>
> Key: HADOOP-19120
> URL: https://issues.apache.org/jira/browse/HADOOP-19120
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.5.0
> Reporter: Pranav Saxena
> Assignee: Pranav Saxena
> Priority: Major
> Labels: pull-request-available
>
> Apache HttpClient is more feature-rich and flexible and gives application
> more granular control over networking parameter.
> ABFS currently relies on the JDK-net library. This library is managed by
> OpenJDK and has no performance problem. However, it limits the application's
> control over networking, and there are very few APIs and hooks exposed that
> the application can use to get metrics, choose which and when a connection
> should be reused. ApacheHttpClient will give important hooks to fetch
> important metrics and control networking parameters.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]